You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/16 15:38:20 UTC
[1/6] storm git commit: STORM-1549: Add support for resetting tuple
timeout from bolts via the OutputCollector
Repository: storm
Updated Branches:
refs/heads/1.x-branch f0abfff92 -> 53e1ab0c6
STORM-1549: Add support for resetting tuple timeout from bolts via the OutputCollector
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/406052cd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/406052cd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/406052cd
Branch: refs/heads/1.x-branch
Commit: 406052cdc7138046a79104fc5c6f72212415f7f2
Parents: 5fd3121
Author: Stig Rohde Døssing <ge...@gmail.com>
Authored: Sun Feb 14 02:39:42 2016 +0100
Committer: Stig Rohde Døssing <ge...@gmail.com>
Committed: Sun Feb 14 15:17:37 2016 +0100
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/clojure.clj | 3 ++
.../src/clj/org/apache/storm/daemon/acker.clj | 10 +++-
.../src/clj/org/apache/storm/daemon/common.clj | 9 +++-
.../clj/org/apache/storm/daemon/executor.clj | 10 ++++
.../storm/coordination/CoordinatedBolt.java | 4 ++
.../org/apache/storm/task/IOutputCollector.java | 1 +
.../org/apache/storm/task/OutputCollector.java | 10 ++++
.../storm/topology/BasicOutputCollector.java | 4 ++
.../storm/topology/IBasicOutputCollector.java | 2 +
.../trident/topology/TridentBoltExecutor.java | 4 ++
.../org/apache/storm/integration_test.clj | 51 ++++++++++++++++++--
11 files changed, 103 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/clj/org/apache/storm/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/clojure.clj b/storm-core/src/clj/org/apache/storm/clojure.clj
index ff33829..6e88cb6 100644
--- a/storm-core/src/clj/org/apache/storm/clojure.clj
+++ b/storm-core/src/clj/org/apache/storm/clojure.clj
@@ -173,6 +173,9 @@
(defn fail! [collector ^Tuple tuple]
(.fail ^OutputCollector (:output-collector collector) tuple))
+(defn reset-timeout! [collector ^Tuple tuple]
+ (.resetTimeout ^OutputCollector (:output-collector collector) tuple))
+
(defn report-error! [collector ^Tuple tuple]
(.reportError ^OutputCollector (:output-collector collector) tuple))
http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
index 7c4d614..7c29f46 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
@@ -30,6 +30,7 @@
(def ACKER-INIT-STREAM-ID "__ack_init")
(def ACKER-ACK-STREAM-ID "__ack_ack")
(def ACKER-FAIL-STREAM-ID "__ack_fail")
+(def ACKER-RESET-TIMEOUT-STREAM-ID "__ack_reset_timeout")
(defn- update-ack [curr-entry val]
(let [old (get curr-entry :val 0)]
@@ -61,7 +62,8 @@
(update-ack (.getValue tuple 1))
(assoc :spout-task (.getValue tuple 2)))
ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1))
- ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
+ ACKER-FAIL-STREAM-ID (assoc curr :failed true)
+ ACKER-RESET-TIMEOUT-STREAM-ID curr)]
(.put pending id curr)
(when (and curr (:spout-task curr))
(cond (= 0 (:val curr))
@@ -80,6 +82,12 @@
ACKER-FAIL-STREAM-ID
[id]
))
+ (= stream-id ACKER-RESET-TIMEOUT-STREAM-ID)
+ (acker-emit-direct output-collector
+ (:spout-task curr)
+ ACKER-RESET-TIMEOUT-STREAM-ID
+ [id]
+ )
))
(.ack output-collector tuple)
))))
http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index c1e261f..55bc030 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -48,6 +48,7 @@
(def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
(def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID)
(def ACKER-FAIL-STREAM-ID acker/ACKER-FAIL-STREAM-ID)
+(def ACKER-RESET-TIMEOUT-STREAM-ID acker/ACKER-RESET-TIMEOUT-STREAM-ID)
(def SYSTEM-STREAM-ID "__system")
@@ -195,7 +196,8 @@
bolt-inputs (apply merge
(for [id bolt-ids]
{[id ACKER-ACK-STREAM-ID] ["id"]
- [id ACKER-FAIL-STREAM-ID] ["id"]}
+ [id ACKER-FAIL-STREAM-ID] ["id"]
+ [id ACKER-RESET-TIMEOUT-STREAM-ID] ["id"]}
))]
(merge spout-inputs bolt-inputs)))
@@ -221,6 +223,7 @@
(new org.apache.storm.daemon.acker)
{ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"])
ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"])
+ ACKER-RESET-TIMEOUT-STREAM-ID (thrift/direct-output-fields ["id"])
}
:p num-executors
:conf {TOPOLOGY-TASKS num-executors
@@ -230,6 +233,7 @@
(do
(.put_to_streams common ACKER-ACK-STREAM-ID (thrift/output-fields ["id" "ack-val"]))
(.put_to_streams common ACKER-FAIL-STREAM-ID (thrift/output-fields ["id"]))
+ (.put_to_streams common ACKER-RESET-TIMEOUT-STREAM-ID (thrift/output-fields ["id"]))
))
(dofor [[_ spout] (.get_spouts ret)
:let [common (.get_common spout)
@@ -246,6 +250,9 @@
(.put_to_inputs common
(GlobalStreamId. ACKER-COMPONENT-ID ACKER-FAIL-STREAM-ID)
(thrift/mk-direct-grouping))
+ (.put_to_inputs common
+ (GlobalStreamId. ACKER-COMPONENT-ID ACKER-RESET-TIMEOUT-STREAM-ID)
+ (thrift/mk-direct-grouping))
))
(.put_to_bolts ret "__acker" acker-bolt)
))
http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 251387b..0d48548 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -519,6 +519,11 @@
spout-obj (:object task-data)]
(when (instance? ICredentialsListener spout-obj)
(.setCredentials spout-obj (.getValue tuple 0))))
+ ACKER-RESET-TIMEOUT-STREAM-ID
+ (let [id (.getValue tuple 0)
+ pending-for-id (.get pending id)]
+ (when pending-for-id
+ (.put pending id pending-for-id)))
(let [id (.getValue tuple 0)
[stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
(when spout-id
@@ -828,6 +833,11 @@
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta))))
+ (^void resetTimeout [this ^Tuple tuple]
+ (fast-list-iter [root (.. tuple getMessageId getAnchors)]
+ (task/send-unanchored task-data
+ ACKER-RESET-TIMEOUT-STREAM-ID
+ [root])))
(reportError [this error]
(report-error error)
)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java b/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
index ee66b09..15ac5e2 100644
--- a/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
@@ -124,6 +124,10 @@ public class CoordinatedBolt implements IRichBolt {
checkFinishId(tuple, TupleType.REGULAR);
_delegate.fail(tuple);
}
+
+ public void resetTimeout(Tuple tuple) {
+ _delegate.resetTimeout(tuple);
+ }
public void reportError(Throwable error) {
_delegate.reportError(error);
http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java b/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java
index cbbe108..cda4d9f 100644
--- a/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java
@@ -29,4 +29,5 @@ public interface IOutputCollector extends IErrorReporter {
void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);
void ack(Tuple input);
void fail(Tuple input);
+ void resetTimeout(Tuple input);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
index e6e54ac..071d8aa 100644
--- a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
@@ -218,6 +218,16 @@ public class OutputCollector implements IOutputCollector {
_delegate.fail(input);
}
+ /**
+ * Resets the message timeout for any tuple trees to which the given tuple belongs.
+ * The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS.
+ * @param input the tuple to reset timeout for
+ */
+ @Override
+ public void resetTimeout(Tuple input) {
+ _delegate.resetTimeout(input);
+ }
+
@Override
public void reportError(Throwable error) {
_delegate.reportError(error);
http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
index cedc7c9..343c349 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
@@ -52,6 +52,10 @@ public class BasicOutputCollector implements IBasicOutputCollector {
emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
}
+ public void resetTimeout(Tuple tuple){
+ out.resetTimeout(tuple);
+ }
+
protected IOutputCollector getOutputter() {
return out;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java b/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java
index 60da48a..7b7c9fc 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java
@@ -18,10 +18,12 @@
package org.apache.storm.topology;
import org.apache.storm.task.IErrorReporter;
+import org.apache.storm.tuple.Tuple;
import java.util.List;
public interface IBasicOutputCollector extends IErrorReporter{
List<Integer> emit(String streamId, List<Object> tuple);
void emitDirect(int taskId, String streamId, List<Object> tuple);
+ void resetTimeout(Tuple tuple);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
index d85d217..41feb12 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
@@ -180,6 +180,10 @@ public class TridentBoltExecutor implements IRichBolt {
public void fail(Tuple tuple) {
throw new IllegalStateException("Method should never be called");
}
+
+ public void resetTimeout(Tuple tuple) {
+ throw new IllegalStateException("Method should never be called");
+ }
public void reportError(Throwable error) {
_delegate.reportError(error);
http://git-wip-us.apache.org/repos/asf/storm/blob/406052cd/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index cd2bc26..238e0db 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -20,6 +20,7 @@
(:import [org.apache.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions])
(:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount
TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
+ (:import [org.apache.storm.utils Time])
(:import [org.apache.storm.tuple Fields])
(:use [org.apache.storm testing config clojure util])
(:use [org.apache.storm.daemon common])
@@ -83,9 +84,18 @@
(ack! collector tuple)
))))))
-(defn assert-loop [afn ids]
- (while (not (every? afn ids))
- (Thread/sleep 1)))
+(defn assert-loop
+([afn ids] (assert-loop afn ids 10))
+([afn ids timeout-secs]
+ (loop [remaining-time (* timeout-secs 1000)]
+ (let [start-time (System/currentTimeMillis)
+ assertion-is-true (every? afn ids)]
+ (if (or assertion-is-true (neg? remaining-time))
+ (is assertion-is-true)
+ (do
+ (Thread/sleep 1)
+ (recur (- remaining-time (- (System/currentTimeMillis) start-time)))
+ ))))))
(defn assert-acked [tracker & ids]
(assert-loop #(.isAcked tracker %) ids))
@@ -116,6 +126,41 @@
(assert-failed tracker 2)
)))
+(defbolt extend-timeout-twice {} {:prepare true}
+ [conf context collector]
+ (let [state (atom -1)]
+ (bolt
+ (execute [tuple]
+ (do
+ (Time/sleep (* 8 1000))
+ (reset-timeout! collector tuple)
+ (Time/sleep (* 8 1000))
+ (reset-timeout! collector tuple)
+ (Time/sleep (* 8 1000))
+ (ack! collector tuple)
+ )))))
+
+(deftest test-reset-timeout
+ (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
+ (let [feeder (feeder-spout ["field1"])
+ tracker (AckFailMapTracker.)
+ _ (.setAckFailDelegate feeder tracker)
+ topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec feeder)}
+ {"2" (thrift/mk-bolt-spec {"1" :global} extend-timeout-twice)})]
+ (submit-local-topology (:nimbus cluster)
+ "timeout-tester"
+ {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
+ topology)
+ (advance-cluster-time cluster 11)
+ (.feed feeder ["a"] 1)
+ (advance-cluster-time cluster 21)
+ (is (not (.isFailed tracker 1)))
+ (is (not (.isAcked tracker 1)))
+ (advance-cluster-time cluster 5)
+ (assert-acked tracker 1)
+ )))
+
(defn mk-validate-topology-1 []
(thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
[4/6] storm git commit: Merge branch '1.x-branch' of
https://github.com/apache/storm into 1.x-branch
Posted by bo...@apache.org.
Merge branch '1.x-branch' of https://github.com/apache/storm into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e701a2a1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e701a2a1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e701a2a1
Branch: refs/heads/1.x-branch
Commit: e701a2a1d3438479322da40b5011e8c595ae0295
Parents: e2251c4 f0abfff
Author: Stig Døssing <st...@gmail.com>
Authored: Tue Mar 15 21:33:17 2016 +0100
Committer: Stig Døssing <st...@gmail.com>
Committed: Tue Mar 15 21:33:17 2016 +0100
----------------------------------------------------------------------
.gitignore | 2 +
CHANGELOG.md | 29 +++
README.markdown | 1 +
bin/flight.bash | 4 +-
bin/storm-config.cmd | 4 +
bin/storm.cmd | 2 +-
conf/defaults.yaml | 2 +-
.../travis/print-errors-from-test-reports.py | 4 +
dev-tools/travis/travis-script.sh | 4 +-
.../storm/starter/ThroughputVsLatency.java | 2 +-
.../storm/starter/spout/RandomIntegerSpout.java | 15 +-
.../spout/RandomNumberGeneratorSpout.java | 95 +++++++++
.../trident/TridentMinMaxOfDevicesTopology.java | 201 +++++++++++++++++++
.../TridentMinMaxOfVehiclesTopology.java | 180 +++++++++++++++++
.../src/jvm/storm/starter/StatefulTopology.java | 1 +
.../apache/storm/sql/compiler/CompilerUtil.java | 7 +-
.../apache/storm/sql/compiler/ExprCompiler.java | 32 ++-
.../backends/standalone/RelNodeCompiler.java | 6 +-
.../apache/storm/sql/parser/StormParser.java | 5 +
.../test/org/apache/storm/sql/TestStormSql.java | 64 +++++-
.../storm/sql/compiler/TestCompilerUtils.java | 62 +++++-
.../storm/sql/compiler/TestExprSemantic.java | 18 ++
.../backends/standalone/TestPlanCompiler.java | 20 ++
.../backends/trident/TestPlanCompiler.java | 4 +-
.../test/org/apache/storm/sql/TestUtils.java | 32 ++-
.../storm/hbase/security/HBaseSecurityUtil.java | 36 ++--
external/storm-hdfs/pom.xml | 23 ++-
.../storm/hdfs/bolt/AbstractHdfsBolt.java | 10 +-
.../storm/hdfs/bolt/AvroGenericRecordBolt.java | 8 +-
.../org/apache/storm/hdfs/bolt/HdfsBolt.java | 8 +-
.../storm/hdfs/bolt/SequenceFileBolt.java | 8 +-
.../apache/storm/kafka/PartitionManager.java | 12 +-
external/storm-mqtt/core/pom.xml | 4 +-
pom.xml | 13 +-
storm-core/pom.xml | 2 +-
storm-core/src/clj/org/apache/storm/cluster.clj | 11 +-
storm-core/src/clj/org/apache/storm/config.clj | 4 +
.../clj/org/apache/storm/daemon/executor.clj | 2 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 39 ++--
.../clj/org/apache/storm/daemon/supervisor.clj | 17 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 8 +-
storm-core/src/clj/org/apache/storm/ui/core.clj | 20 +-
storm-core/src/clj/org/apache/storm/util.clj | 4 -
storm-core/src/jvm/org/apache/storm/Config.java | 13 ++
.../jvm/org/apache/storm/drpc/DRPCSpout.java | 2 +
.../apache/storm/messaging/netty/Client.java | 34 ++++
.../storm/metric/FileBasedEventLogger.java | 45 +++--
.../auth/AbstractSaslClientCallbackHandler.java | 76 +++++++
.../auth/AbstractSaslServerCallbackHandler.java | 94 +++++++++
.../apache/storm/security/auth/AuthUtils.java | 40 ++++
.../security/auth/SimpleTransportPlugin.java | 2 +
.../security/auth/ThriftConnectionType.java | 8 +-
.../auth/digest/ClientCallbackHandler.java | 60 +-----
.../auth/digest/ServerCallbackHandler.java | 61 +-----
.../storm/security/auth/kerberos/AutoTGT.java | 64 +++---
.../auth/kerberos/AutoTGTKrb5LoginModule.java | 8 +-
.../auth/plain/PlainClientCallbackHandler.java | 31 +++
.../auth/plain/PlainSaslTransportPlugin.java | 71 +++++++
.../auth/plain/PlainServerCallbackHandler.java | 55 +++++
.../security/auth/plain/SaslPlainServer.java | 158 +++++++++++++++
.../serialization/SerializationFactory.java | 2 +
.../testing/SingleUserSimpleTransport.java | 5 +-
.../topology/CheckpointTupleForwarder.java | 22 +-
.../apache/storm/topology/IStatefulBolt.java | 7 +-
.../storm/topology/StatefulBoltExecutor.java | 46 ++++-
.../apache/storm/topology/TopologyBuilder.java | 18 +-
.../jvm/org/apache/storm/trident/Stream.java | 121 +++++++++--
.../operation/builtin/ComparisonAggregator.java | 91 +++++++++
.../storm/trident/operation/builtin/Max.java | 37 ++++
.../operation/builtin/MaxWithComparator.java | 51 +++++
.../storm/trident/operation/builtin/Min.java | 36 ++++
.../operation/builtin/MinWithComparator.java | 51 +++++
.../apache/storm/trident/tuple/ConsList.java | 20 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 21 +-
.../storm/utils/WorkerBackpressureCallback.java | 3 +-
.../storm/utils/WorkerBackpressureThread.java | 38 +++-
storm-core/src/ui/public/component.html | 2 +-
.../templates/topology-page-template.html | 6 +-
storm-core/src/ui/public/topology.html | 2 +-
.../security/auth/auto_login_module_test.clj | 24 ++-
.../clj/org/apache/storm/supervisor_test.clj | 3 +
.../topology/StatefulBoltExecutorTest.java | 1 +
.../storm/topology/TopologyBuilderTest.java | 65 ++++++
.../utils/WorkerBackpressureThreadTest.java | 50 +++++
84 files changed, 2225 insertions(+), 344 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e701a2a1/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
[3/6] storm git commit: Fix javadoc param name
Posted by bo...@apache.org.
Fix javadoc param name
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e2251c45
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e2251c45
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e2251c45
Branch: refs/heads/1.x-branch
Commit: e2251c4506f681abffa5e59baed02a567f8d4264
Parents: 94a93d7
Author: Stig Døssing <st...@gmail.com>
Authored: Wed Mar 2 18:02:46 2016 +0100
Committer: Stig Døssing <st...@gmail.com>
Committed: Wed Mar 2 18:02:46 2016 +0100
----------------------------------------------------------------------
.../src/jvm/org/apache/storm/topology/BasicOutputCollector.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e2251c45/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
index 1d1e5ff..2cf1e82 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
@@ -56,7 +56,7 @@ public class BasicOutputCollector implements IBasicOutputCollector {
* Resets the message timeout for any tuple trees to which the given tuple belongs.
* The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS.
* Note that this is an expensive operation, and should be used sparingly.
- * @param input the tuple to reset timeout for
+ * @param tuple the tuple to reset timeout for
*/
public void resetTimeout(Tuple tuple){
out.resetTimeout(tuple);
[6/6] storm git commit: Added STORM-1549 to Changelog
Posted by bo...@apache.org.
Added STORM-1549 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/53e1ab0c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/53e1ab0c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/53e1ab0c
Branch: refs/heads/1.x-branch
Commit: 53e1ab0c6efd79627bf411e153caac01efe281f0
Parents: 887ec81
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Mar 16 09:35:56 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Mar 16 09:36:29 2016 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/53e1ab0c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a959dbe..5562ee1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.0.0
+ * STORM-1549: Add support for resetting tuple timeout from bolts via the OutputCollector
* STORM-971: Metric for messages lost due to kafka retention
* STORM-1608: Fix stateful topology acking behavior
* STORM-1609: Netty Client is not best effort delivery on failed Connection
[2/6] storm git commit: Add javadoc about reset timeout being
expensive
Posted by bo...@apache.org.
Add javadoc about reset timeout being expensive
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/94a93d7e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/94a93d7e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/94a93d7e
Branch: refs/heads/1.x-branch
Commit: 94a93d7e6f7d7e1ed3eac4b367c5378c95e3ca2c
Parents: 406052c
Author: Stig Døssing <st...@gmail.com>
Authored: Wed Mar 2 17:58:52 2016 +0100
Committer: Stig Døssing <st...@gmail.com>
Committed: Wed Mar 2 17:58:52 2016 +0100
----------------------------------------------------------------------
storm-core/src/jvm/org/apache/storm/task/OutputCollector.java | 1 +
.../jvm/org/apache/storm/topology/BasicOutputCollector.java | 6 ++++++
2 files changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/94a93d7e/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
index 071d8aa..4db87f0 100644
--- a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
@@ -221,6 +221,7 @@ public class OutputCollector implements IOutputCollector {
/**
* Resets the message timeout for any tuple trees to which the given tuple belongs.
* The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS.
+ * Note that this is an expensive operation, and should be used sparingly.
* @param input the tuple to reset timeout for
*/
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/94a93d7e/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
index 343c349..1d1e5ff 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
@@ -52,6 +52,12 @@ public class BasicOutputCollector implements IBasicOutputCollector {
emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
}
+ /**
+ * Resets the message timeout for any tuple trees to which the given tuple belongs.
+ * The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS.
+ * Note that this is an expensive operation, and should be used sparingly.
+ * @param input the tuple to reset timeout for
+ */
public void resetTimeout(Tuple tuple){
out.resetTimeout(tuple);
}
[5/6] storm git commit: Merge branch '1.x-branch' of
https://github.com/srdo/storm into STORM-1549-1.x
Posted by bo...@apache.org.
Merge branch '1.x-branch' of https://github.com/srdo/storm into STORM-1549-1.x
STORM-1549: Add support for resetting tuple timeout from bolts via the OutputCollector
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/887ec814
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/887ec814
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/887ec814
Branch: refs/heads/1.x-branch
Commit: 887ec814856b7b8e59146af8e3d80810f07fbe39
Parents: f0abfff e701a2a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Mar 16 09:13:26 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Mar 16 09:13:26 2016 -0500
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/clojure.clj | 3 ++
.../src/clj/org/apache/storm/daemon/acker.clj | 10 +++-
.../src/clj/org/apache/storm/daemon/common.clj | 9 +++-
.../clj/org/apache/storm/daemon/executor.clj | 10 ++++
.../storm/coordination/CoordinatedBolt.java | 4 ++
.../org/apache/storm/task/IOutputCollector.java | 1 +
.../org/apache/storm/task/OutputCollector.java | 11 +++++
.../storm/topology/BasicOutputCollector.java | 10 ++++
.../storm/topology/IBasicOutputCollector.java | 2 +
.../trident/topology/TridentBoltExecutor.java | 4 ++
.../org/apache/storm/integration_test.clj | 51 ++++++++++++++++++--
11 files changed, 110 insertions(+), 5 deletions(-)
----------------------------------------------------------------------