You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/16 15:36:53 UTC
[1/7] storm git commit: STORM-1549: Add support for resetting tuple
timeout from bolts via the OutputCollector
Repository: storm
Updated Branches:
refs/heads/master 7f52aecb1 -> 4d15d4c38
STORM-1549: Add support for resetting tuple timeout from bolts via the OutputCollector
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d36be51a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d36be51a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d36be51a
Branch: refs/heads/master
Commit: d36be51a39abb03ac47e01eb2e1fda31f9f9110b
Parents: d42c437
Author: Stig Rohde Døssing <ge...@gmail.com>
Authored: Sun Feb 14 02:39:42 2016 +0100
Committer: Stig Døssing <st...@gmail.com>
Committed: Tue Mar 1 23:52:36 2016 +0100
----------------------------------------------------------------------
.../src/clj/org/apache/storm/clojure.clj | 3 ++
.../src/clj/org/apache/storm/daemon/common.clj | 8 +++
.../clj/org/apache/storm/daemon/executor.clj | 11 +++-
.../clj/org/apache/storm/internal/clojure.clj | 3 ++
.../storm/coordination/CoordinatedBolt.java | 4 ++
.../src/jvm/org/apache/storm/daemon/Acker.java | 15 +++---
.../org/apache/storm/task/IOutputCollector.java | 1 +
.../org/apache/storm/task/OutputCollector.java | 10 ++++
.../storm/topology/BasicOutputCollector.java | 4 ++
.../storm/topology/IBasicOutputCollector.java | 2 +
.../trident/topology/TridentBoltExecutor.java | 4 ++
.../org/apache/storm/integration_test.clj | 53 ++++++++++++++++++--
12 files changed, 108 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-clojure/src/clj/org/apache/storm/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/clojure.clj b/storm-clojure/src/clj/org/apache/storm/clojure.clj
index 9e1836f..607fc24 100644
--- a/storm-clojure/src/clj/org/apache/storm/clojure.clj
+++ b/storm-clojure/src/clj/org/apache/storm/clojure.clj
@@ -179,6 +179,9 @@
(defn fail! [collector ^Tuple tuple]
(.fail ^OutputCollector (:output-collector collector) tuple))
+(defn reset-timeout! [collector ^Tuple tuple]
+ (.resetTimeout ^OutputCollector (:output-collector collector) tuple))
+
(defn report-error! [collector ^Tuple tuple]
(.reportError ^OutputCollector (:output-collector collector) tuple))
http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 65cf233..49b0bb9 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -51,6 +51,7 @@
(def ACKER-INIT-STREAM-ID Acker/ACKER_INIT_STREAM_ID)
(def ACKER-ACK-STREAM-ID Acker/ACKER_ACK_STREAM_ID)
(def ACKER-FAIL-STREAM-ID Acker/ACKER_FAIL_STREAM_ID)
+(def ACKER-RESET-TIMEOUT-STREAM-ID Acker/ACKER_RESET_TIMEOUT_STREAM_ID)
(def SYSTEM-STREAM-ID "__system")
@@ -202,6 +203,8 @@
{(Utils/getGlobalStreamId id ACKER-ACK-STREAM-ID)
(Thrift/prepareFieldsGrouping ["id"])
(Utils/getGlobalStreamId id ACKER-FAIL-STREAM-ID)
+ (Thrift/prepareFieldsGrouping ["id"])
+ (Utils/getGlobalStreamId id ACKER-RESET-TIMEOUT-STREAM-ID)
(Thrift/prepareFieldsGrouping ["id"])}
))]
(merge spout-inputs bolt-inputs)))
@@ -233,6 +236,7 @@
(mk-acker-bolt)
{ACKER-ACK-STREAM-ID (Thrift/directOutputFields ["id"])
ACKER-FAIL-STREAM-ID (Thrift/directOutputFields ["id"])
+ ACKER-RESET-TIMEOUT-STREAM-ID (Thrift/directOutputFields ["id"])
}
(Integer. num-executors)
{TOPOLOGY-TASKS num-executors
@@ -242,6 +246,7 @@
(do
(.put_to_streams common ACKER-ACK-STREAM-ID (Thrift/outputFields ["id" "ack-val"]))
(.put_to_streams common ACKER-FAIL-STREAM-ID (Thrift/outputFields ["id"]))
+ (.put_to_streams common ACKER-RESET-TIMEOUT-STREAM-ID (Thrift/outputFields ["id"]))
))
(dofor [[_ spout] (.get_spouts ret)
:let [common (.get_common spout)
@@ -258,6 +263,9 @@
(.put_to_inputs common
(GlobalStreamId. ACKER-COMPONENT-ID ACKER-FAIL-STREAM-ID)
(Thrift/prepareDirectGrouping))
+ (.put_to_inputs common
+ (GlobalStreamId. ACKER-COMPONENT-ID ACKER-RESET-TIMEOUT-STREAM-ID)
+ (Thrift/prepareDirectGrouping))
))
(.put_to_bolts ret "__acker" acker-bolt)
))
http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 9ff93f8..de32544 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -529,6 +529,11 @@
spout-obj (:object task-data)]
(when (instance? ICredentialsListener spout-obj)
(.setCredentials spout-obj (.getValue tuple 0))))
+ ACKER-RESET-TIMEOUT-STREAM-ID
+ (let [id (.getValue tuple 0)
+ pending-for-id (.get pending id)]
+ (when pending-for-id
+ (.put pending id pending-for-id)))
(let [id (.getValue tuple 0)
[stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
(when spout-id
@@ -830,9 +835,13 @@
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta))))
+ (^void resetTimeout [this ^Tuple tuple]
+ (fast-list-iter [root (.. tuple getMessageId getAnchors)]
+ (task/send-unanchored task-data
+ ACKER-RESET-TIMEOUT-STREAM-ID
+ [root])))
(reportError [this error]
(report-error error))))))
-
(reset! open-or-prepare-was-called? true)
(log-message "Prepared bolt " component-id ":" (keys task-datas))
(setup-metrics! executor-data)
http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/clj/org/apache/storm/internal/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/internal/clojure.clj b/storm-core/src/clj/org/apache/storm/internal/clojure.clj
index 3f29757..f27ac04 100644
--- a/storm-core/src/clj/org/apache/storm/internal/clojure.clj
+++ b/storm-core/src/clj/org/apache/storm/internal/clojure.clj
@@ -179,6 +179,9 @@
(defn fail! [collector ^Tuple tuple]
(.fail ^OutputCollector (:output-collector collector) tuple))
+(defn reset-timeout! [collector ^Tuple tuple]
+ (.resetTimeout ^OutputCollector (:output-collector collector) tuple))
+
(defn report-error! [collector ^Tuple tuple]
(.reportError ^OutputCollector (:output-collector collector) tuple))
http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java b/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
index ee66b09..15ac5e2 100644
--- a/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
@@ -124,6 +124,10 @@ public class CoordinatedBolt implements IRichBolt {
checkFinishId(tuple, TupleType.REGULAR);
_delegate.fail(tuple);
}
+
+ public void resetTimeout(Tuple tuple) {
+ _delegate.resetTimeout(tuple);
+ }
public void reportError(Throwable error) {
_delegate.reportError(error);
http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
index 7d05e24..eb14af7 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
@@ -40,6 +40,7 @@ public class Acker implements IBolt {
public static final String ACKER_INIT_STREAM_ID = "__ack_init";
public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
+ public static final String ACKER_RESET_TIMEOUT_STREAM_ID = "__ack_reset_timeout";
public static final int TIMEOUT_BUCKET_NUM = 3;
@@ -100,6 +101,8 @@ public class Acker implements IBolt {
}
curr.failed = true;
pending.put(id, curr);
+ } else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+ pending.put(id, curr);
} else {
LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
return;
@@ -110,11 +113,11 @@ public class Acker implements IBolt {
if (curr.val == 0) {
pending.remove(id);
collector.emitDirect(task, ACKER_ACK_STREAM_ID, new Values(id));
- } else {
- if (curr.failed) {
- pending.remove(id);
- collector.emitDirect(task, ACKER_FAIL_STREAM_ID, new Values(id));
- }
+ } else if (curr.failed) {
+ pending.remove(id);
+ collector.emitDirect(task, ACKER_FAIL_STREAM_ID, new Values(id));
+ } else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+ collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, new Values(id));
}
}
@@ -125,4 +128,4 @@ public class Acker implements IBolt {
public void cleanup() {
LOG.info("Acker: cleanup successfully");
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java b/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java
index cbbe108..cda4d9f 100644
--- a/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java
@@ -29,4 +29,5 @@ public interface IOutputCollector extends IErrorReporter {
void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);
void ack(Tuple input);
void fail(Tuple input);
+ void resetTimeout(Tuple input);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
index e6e54ac..071d8aa 100644
--- a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
@@ -218,6 +218,16 @@ public class OutputCollector implements IOutputCollector {
_delegate.fail(input);
}
+ /**
+ * Resets the message timeout for any tuple trees to which the given tuple belongs.
+ * The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS.
+ * @param input the tuple to reset timeout for
+ */
+ @Override
+ public void resetTimeout(Tuple input) {
+ _delegate.resetTimeout(input);
+ }
+
@Override
public void reportError(Throwable error) {
_delegate.reportError(error);
http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
index cedc7c9..343c349 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
@@ -52,6 +52,10 @@ public class BasicOutputCollector implements IBasicOutputCollector {
emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
}
+ public void resetTimeout(Tuple tuple){
+ out.resetTimeout(tuple);
+ }
+
protected IOutputCollector getOutputter() {
return out;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java b/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java
index 60da48a..7b7c9fc 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java
@@ -18,10 +18,12 @@
package org.apache.storm.topology;
import org.apache.storm.task.IErrorReporter;
+import org.apache.storm.tuple.Tuple;
import java.util.List;
public interface IBasicOutputCollector extends IErrorReporter{
List<Integer> emit(String streamId, List<Object> tuple);
void emitDirect(int taskId, String streamId, List<Object> tuple);
+ void resetTimeout(Tuple tuple);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
index d85d217..41feb12 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
@@ -180,6 +180,10 @@ public class TridentBoltExecutor implements IRichBolt {
public void fail(Tuple tuple) {
throw new IllegalStateException("Method should never be called");
}
+
+ public void resetTimeout(Tuple tuple) {
+ throw new IllegalStateException("Method should never be called");
+ }
public void reportError(Throwable error) {
_delegate.reportError(error);
http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index 697bdae..6d3b8f0 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -20,6 +20,7 @@
(:import [org.apache.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions])
(:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount
TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
+ (:import [org.apache.storm.utils Time])
(:import [org.apache.storm.tuple Fields])
(:import [org.apache.storm.cluster StormClusterStateImpl])
(:use [org.apache.storm.internal clojure])
@@ -97,9 +98,18 @@
(ack! collector tuple)
))))))
-(defn assert-loop [afn ids]
- (while (not (every? afn ids))
- (Thread/sleep 1)))
+(defn assert-loop
+([afn ids] (assert-loop afn ids 10))
+([afn ids timeout-secs]
+ (loop [remaining-time (* timeout-secs 1000)]
+ (let [start-time (System/currentTimeMillis)
+ assertion-is-true (every? afn ids)]
+ (if (or assertion-is-true (neg? remaining-time))
+ (is assertion-is-true)
+ (do
+ (Thread/sleep 1)
+ (recur (- remaining-time (- (System/currentTimeMillis) start-time)))
+ ))))))
(defn assert-acked [tracker & ids]
(assert-loop #(.isAcked tracker %) ids))
@@ -132,6 +142,43 @@
(assert-failed tracker 2)
)))
+(defbolt extend-timeout-twice {} {:prepare true}
+ [conf context collector]
+ (let [state (atom -1)]
+ (bolt
+ (execute [tuple]
+ (do
+ (Time/sleep (* 8 1000))
+ (reset-timeout! collector tuple)
+ (Time/sleep (* 8 1000))
+ (reset-timeout! collector tuple)
+ (Time/sleep (* 8 1000))
+ (ack! collector tuple)
+ )))))
+
+(deftest test-reset-timeout
+ (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
+ (let [feeder (feeder-spout ["field1"])
+ tracker (AckFailMapTracker.)
+ _ (.setAckFailDelegate feeder tracker)
+ topology (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails feeder)}
+ {"2" (Thrift/prepareBoltDetails
+ {(Utils/getGlobalStreamId "1" nil)
+ (Thrift/prepareGlobalGrouping)} extend-timeout-twice)})]
+ (submit-local-topology (:nimbus cluster)
+ "timeout-tester"
+ {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
+ topology)
+ (advance-cluster-time cluster 11)
+ (.feed feeder ["a"] 1)
+ (advance-cluster-time cluster 21)
+ (is (not (.isFailed tracker 1)))
+ (is (not (.isAcked tracker 1)))
+ (advance-cluster-time cluster 5)
+ (assert-acked tracker 1)
+ )))
+
(defn mk-validate-topology-1 []
(Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
[5/7] storm git commit: STORM-1549: Update branch to use java
StormCommons
Posted by bo...@apache.org.
STORM-1549: Update branch to use java StormCommons
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e0b874f9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e0b874f9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e0b874f9
Branch: refs/heads/master
Commit: e0b874f99314e45ec8f74f9b08fe1d742454d419
Parents: e2d118a
Author: Stig Døssing <st...@gmail.com>
Authored: Tue Mar 15 21:15:19 2016 +0100
Committer: Stig Døssing <st...@gmail.com>
Committed: Tue Mar 15 21:15:19 2016 +0100
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/daemon/executor.clj | 4 ++--
storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java | 4 ++++
2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e0b874f9/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 075f72b..086955f 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -529,7 +529,7 @@
spout-obj (:object task-data)]
(when (instance? ICredentialsListener spout-obj)
(.setCredentials spout-obj (.getValue tuple 0))))
- ACKER-RESET-TIMEOUT-STREAM-ID
+ Acker/ACKER_RESET_TIMEOUT_STREAM_ID
(let [id (.getValue tuple 0)
pending-for-id (.get pending id)]
(when pending-for-id
@@ -838,7 +838,7 @@
(^void resetTimeout [this ^Tuple tuple]
(fast-list-iter [root (.. tuple getMessageId getAnchors)]
(task/send-unanchored task-data
- ACKER-RESET-TIMEOUT-STREAM-ID
+ Acker/ACKER_RESET_TIMEOUT_STREAM_ID
[root])))
(reportError [this error]
(report-error error))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/e0b874f9/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index 85568ec..7792052 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -257,6 +257,7 @@ public class StormCommon {
for(String id : boltIds) {
inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
}
return inputs;
}
@@ -275,6 +276,7 @@ public class StormCommon {
Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id")));
outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id")));
+ outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id")));
Map<String, Object> ackerConf = new HashMap<String, Object>();
ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
@@ -286,6 +288,7 @@ public class StormCommon {
ComponentCommon common = bolt.get_common();
common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val")));
common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
+ common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
}
for (SpoutSpec spout : topology.get_spouts().values()) {
@@ -296,6 +299,7 @@ public class StormCommon {
common.put_to_streams(Acker.ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping());
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping());
+ common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareDirectGrouping());
}
topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);
[7/7] storm git commit: Added STORM-1549 to Changelog
Posted by bo...@apache.org.
Added STORM-1549 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4d15d4c3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4d15d4c3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4d15d4c3
Branch: refs/heads/master
Commit: 4d15d4c3851acf94fbfe876b3d13842c492bdbc3
Parents: ceeab06
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Mar 16 09:35:56 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Mar 16 09:35:56 2016 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4d15d4c3/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f6fdb76..0f06098 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -57,6 +57,7 @@
* STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt.
## 1.0.0
+ * STORM-1549: Add support for resetting tuple timeout from bolts via the OutputCollector
* STORM-971: Metric for messages lost due to kafka retention
* STORM-1483: add storm-mongodb connector
* STORM-1608: Fix stateful topology acking behavior
[4/7] storm git commit: Merge branch 'master' of
https://github.com/apache/storm
Posted by bo...@apache.org.
Merge branch 'master' of https://github.com/apache/storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e2d118af
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e2d118af
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e2d118af
Branch: refs/heads/master
Commit: e2d118af9071ab6bcdcbec18df055770b991ee79
Parents: f68b4c6 500ef20
Author: Stig Døssing <st...@gmail.com>
Authored: Tue Mar 15 20:49:51 2016 +0100
Committer: Stig Døssing <st...@gmail.com>
Committed: Tue Mar 15 20:49:51 2016 +0100
----------------------------------------------------------------------
.gitignore | 2 +
CHANGELOG.md | 26 +
bin/storm | 19 +-
bin/storm.py | 2 +-
conf/defaults.yaml | 3 +-
conf/log4j2.xml | 2 +-
.../travis/print-errors-from-test-reports.py | 4 +
.../org/apache/storm/starter/ManualDRPC.java | 53 +-
.../storm/starter/spout/RandomIntegerSpout.java | 15 +-
.../src/jvm/storm/starter/StatefulTopology.java | 1 +
.../apache/storm/kafka/PartitionManager.java | 12 +-
external/storm-mongodb/README.md | 195 ++
external/storm-mongodb/pom.xml | 74 +
.../storm/mongodb/bolt/AbstractMongoBolt.java | 56 +
.../storm/mongodb/bolt/MongoInsertBolt.java | 62 +
.../storm/mongodb/bolt/MongoUpdateBolt.java | 75 +
.../storm/mongodb/common/MongoDBClient.java | 91 +
.../mongodb/common/QueryFilterCreator.java | 38 +
.../common/SimpleQueryFilterCreator.java | 39 +
.../mongodb/common/mapper/MongoMapper.java | 38 +
.../common/mapper/SimpleMongoMapper.java | 40 +
.../common/mapper/SimpleMongoUpdateMapper.java | 41 +
.../storm/mongodb/trident/state/MongoState.java | 97 +
.../trident/state/MongoStateFactory.java | 42 +
.../trident/state/MongoStateUpdater.java | 34 +
.../storm/mongodb/topology/InsertWordCount.java | 81 +
.../storm/mongodb/topology/UpdateWordCount.java | 91 +
.../storm/mongodb/topology/WordCounter.java | 67 +
.../storm/mongodb/topology/WordSpout.java | 88 +
.../storm/mongodb/trident/WordCountTrident.java | 85 +
pom.xml | 26 +-
.../src/clj/org/apache/storm/LocalDRPC.clj | 56 -
.../src/clj/org/apache/storm/MockAutoCred.clj | 58 -
.../clj/org/apache/storm/command/heartbeats.clj | 5 +-
.../src/clj/org/apache/storm/converter.clj | 61 +-
.../org/apache/storm/daemon/builtin_metrics.clj | 33 +-
.../src/clj/org/apache/storm/daemon/common.clj | 369 +--
.../src/clj/org/apache/storm/daemon/drpc.clj | 221 +-
.../clj/org/apache/storm/daemon/executor.clj | 46 +-
.../clj/org/apache/storm/daemon/logviewer.clj | 29 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 303 +--
.../clj/org/apache/storm/daemon/supervisor.clj | 34 +-
.../src/clj/org/apache/storm/daemon/task.clj | 15 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 42 +-
storm-core/src/clj/org/apache/storm/stats.clj | 1568 -----------
storm-core/src/clj/org/apache/storm/testing.clj | 108 +-
.../clj/org/apache/storm/trident/testing.clj | 2 -
storm-core/src/clj/org/apache/storm/ui/core.clj | 165 +-
.../src/clj/org/apache/storm/ui/helpers.clj | 10 +-
storm-core/src/clj/org/apache/storm/util.clj | 11 -
.../src/jvm/org/apache/storm/LocalDRPC.java | 72 +
.../storm/cluster/StormClusterStateImpl.java | 7 +-
.../org/apache/storm/daemon/DaemonCommon.java | 22 +
.../jvm/org/apache/storm/daemon/DrpcServer.java | 357 +++
.../org/apache/storm/daemon/StormCommon.java | 537 ++++
.../storm/daemon/metrics/MetricsUtils.java | 2 +-
.../jvm/org/apache/storm/drpc/DRPCSpout.java | 2 +
.../apache/storm/messaging/netty/Client.java | 34 +
.../storm/metric/StormMetricsRegistry.java | 84 +
.../auth/AbstractSaslClientCallbackHandler.java | 76 +
.../auth/AbstractSaslServerCallbackHandler.java | 94 +
.../apache/storm/security/auth/AuthUtils.java | 40 +
.../auth/digest/ClientCallbackHandler.java | 60 +-
.../auth/digest/ServerCallbackHandler.java | 61 +-
.../storm/security/auth/kerberos/AutoTGT.java | 64 +-
.../auth/kerberos/AutoTGTKrb5LoginModule.java | 8 +-
.../auth/plain/PlainClientCallbackHandler.java | 31 +
.../auth/plain/PlainSaslTransportPlugin.java | 71 +
.../auth/plain/PlainServerCallbackHandler.java | 55 +
.../security/auth/plain/SaslPlainServer.java | 158 ++
.../apache/storm/stats/BoltExecutorStats.java | 105 +
.../jvm/org/apache/storm/stats/CommonStats.java | 112 +
.../apache/storm/stats/SpoutExecutorStats.java | 79 +
.../jvm/org/apache/storm/stats/StatsUtil.java | 2441 ++++++++++++++++++
.../topology/CheckpointTupleForwarder.java | 22 +-
.../apache/storm/topology/IStatefulBolt.java | 7 +-
.../storm/topology/StatefulBoltExecutor.java | 46 +-
.../apache/storm/topology/TopologyBuilder.java | 5 +-
.../jvm/org/apache/storm/utils/ConfigUtils.java | 26 +-
.../storm/utils/StormCommonInstaller.java | 43 +
.../src/jvm/org/apache/storm/utils/Utils.java | 42 +-
.../org/apache/storm/integration_test.clj | 6 +-
.../test/clj/org/apache/storm/drpc_test.clj | 27 +-
.../apache/storm/messaging/netty_unit_test.clj | 14 +-
.../test/clj/org/apache/storm/nimbus_test.clj | 148 +-
.../apache/storm/security/auth/auth_test.clj | 18 +-
.../security/auth/auto_login_module_test.clj | 24 +-
.../storm/security/auth/drpc_auth_test.clj | 20 +-
.../storm/security/auth/nimbus_auth_test.clj | 15 +-
.../clj/org/apache/storm/serialization_test.clj | 85 +-
.../clj/org/apache/storm/supervisor_test.clj | 28 +-
.../test/jvm/org/apache/storm/MockAutoCred.java | 75 +
.../org/apache/storm/TestConfigValidate.java | 20 +
.../storm/serialization/SerializationTest.java | 115 +
.../topology/StatefulBoltExecutorTest.java | 1 +
storm-dist/binary/src/main/assembly/binary.xml | 14 +
96 files changed, 6867 insertions(+), 3141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e2d118af/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e2d118af/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
[6/7] storm git commit: Merge branch 'master' of
https://github.com/srdo/storm into STORM-1549
Posted by bo...@apache.org.
Merge branch 'master' of https://github.com/srdo/storm into STORM-1549
STORM-1549: Add support for resetting tuple timeout from bolts via the OutputCollector
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ceeab062
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ceeab062
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ceeab062
Branch: refs/heads/master
Commit: ceeab062ef73ecdfa66c136ba655ce773fb2d9fc
Parents: 7f52aec e0b874f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Mar 16 08:50:38 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Mar 16 08:50:38 2016 -0500
----------------------------------------------------------------------
.../src/clj/org/apache/storm/clojure.clj | 3 ++
.../clj/org/apache/storm/daemon/executor.clj | 11 +++-
.../clj/org/apache/storm/internal/clojure.clj | 3 ++
.../storm/coordination/CoordinatedBolt.java | 4 ++
.../src/jvm/org/apache/storm/daemon/Acker.java | 18 ++++---
.../org/apache/storm/daemon/StormCommon.java | 4 ++
.../org/apache/storm/task/IOutputCollector.java | 1 +
.../org/apache/storm/task/OutputCollector.java | 11 ++++
.../storm/topology/BasicOutputCollector.java | 10 ++++
.../storm/topology/IBasicOutputCollector.java | 2 +
.../trident/topology/TridentBoltExecutor.java | 4 ++
.../org/apache/storm/integration_test.clj | 53 ++++++++++++++++++--
12 files changed, 114 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
[2/7] storm git commit: Add a missing space, fix potential NPE,
add comment to javadoc about reset timeout being expensive
Posted by bo...@apache.org.
Add a missing space, fix potential NPE, add comment to javadoc about reset timeout being expensive
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bc79b4a8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bc79b4a8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bc79b4a8
Branch: refs/heads/master
Commit: bc79b4a8d757a3191a85815877345d38710c73e2
Parents: d36be51
Author: Stig Døssing <st...@gmail.com>
Authored: Wed Mar 2 17:58:01 2016 +0100
Committer: Stig Døssing <st...@gmail.com>
Committed: Wed Mar 2 17:59:14 2016 +0100
----------------------------------------------------------------------
storm-core/src/jvm/org/apache/storm/daemon/Acker.java | 5 ++++-
storm-core/src/jvm/org/apache/storm/task/OutputCollector.java | 1 +
.../jvm/org/apache/storm/topology/BasicOutputCollector.java | 6 ++++++
3 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bc79b4a8/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
index eb14af7..d7b9a2e 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
@@ -101,7 +101,10 @@ public class Acker implements IBolt {
}
curr.failed = true;
pending.put(id, curr);
- } else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+ } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+ if (curr == null) {
+ curr = new AckObject();
+ }
pending.put(id, curr);
} else {
LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
http://git-wip-us.apache.org/repos/asf/storm/blob/bc79b4a8/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
index 071d8aa..4db87f0 100644
--- a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
@@ -221,6 +221,7 @@ public class OutputCollector implements IOutputCollector {
/**
* Resets the message timeout for any tuple trees to which the given tuple belongs.
* The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS.
+ * Note that this is an expensive operation, and should be used sparingly.
* @param input the tuple to reset timeout for
*/
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/bc79b4a8/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
index 343c349..1d1e5ff 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
@@ -52,6 +52,12 @@ public class BasicOutputCollector implements IBasicOutputCollector {
emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
}
+ /**
+ * Resets the message timeout for any tuple trees to which the given tuple belongs.
+ * The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS.
+ * Note that this is an expensive operation, and should be used sparingly.
+ * @param input the tuple to reset timeout for
+ */
public void resetTimeout(Tuple tuple){
out.resetTimeout(tuple);
}
[3/7] storm git commit: Fix javadoc param name
Posted by bo...@apache.org.
Fix javadoc param name
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f68b4c63
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f68b4c63
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f68b4c63
Branch: refs/heads/master
Commit: f68b4c6362e45af5b6cc7569e6d37982220947cd
Parents: bc79b4a
Author: Stig Døssing <st...@gmail.com>
Authored: Wed Mar 2 18:03:11 2016 +0100
Committer: Stig Døssing <st...@gmail.com>
Committed: Wed Mar 2 18:03:11 2016 +0100
----------------------------------------------------------------------
.../src/jvm/org/apache/storm/topology/BasicOutputCollector.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f68b4c63/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
index 1d1e5ff..2cf1e82 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
@@ -56,7 +56,7 @@ public class BasicOutputCollector implements IBasicOutputCollector {
* Resets the message timeout for any tuple trees to which the given tuple belongs.
* The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS.
* Note that this is an expensive operation, and should be used sparingly.
- * @param input the tuple to reset timeout for
+ * @param tuple the tuple to reset timeout for
*/
public void resetTimeout(Tuple tuple){
out.resetTimeout(tuple);