You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/16 15:36:54 UTC

[2/7] storm git commit: Add a missing space, fix potential NPE, add comment to javadoc about reset timeout being expensive

Add a missing space, fix potential NPE, add comment to javadoc about reset timeout being expensive


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bc79b4a8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bc79b4a8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bc79b4a8

Branch: refs/heads/master
Commit: bc79b4a8d757a3191a85815877345d38710c73e2
Parents: d36be51
Author: Stig Døssing <st...@gmail.com>
Authored: Wed Mar 2 17:58:01 2016 +0100
Committer: Stig Døssing <st...@gmail.com>
Committed: Wed Mar 2 17:59:14 2016 +0100

----------------------------------------------------------------------
 storm-core/src/jvm/org/apache/storm/daemon/Acker.java          | 5 ++++-
 storm-core/src/jvm/org/apache/storm/task/OutputCollector.java  | 1 +
 .../jvm/org/apache/storm/topology/BasicOutputCollector.java    | 6 ++++++
 3 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bc79b4a8/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
index eb14af7..d7b9a2e 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
@@ -101,7 +101,10 @@ public class Acker implements IBolt {
             }
             curr.failed = true;
             pending.put(id, curr);
-        } else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+        } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+            if (curr == null) {
+                curr = new AckObject();
+            }
             pending.put(id, curr);
         } else {
             LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());

http://git-wip-us.apache.org/repos/asf/storm/blob/bc79b4a8/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
index 071d8aa..4db87f0 100644
--- a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
@@ -221,6 +221,7 @@ public class OutputCollector implements IOutputCollector {
     /**
     * Resets the message timeout for any tuple trees to which the given tuple belongs.
     * The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS.
+    * Note that this is an expensive operation, and should be used sparingly.
     * @param input the tuple to reset timeout for
     */
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/bc79b4a8/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
index 343c349..1d1e5ff 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
@@ -52,6 +52,12 @@ public class BasicOutputCollector implements IBasicOutputCollector {
         emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
     }
 
+    /**
+    * Resets the message timeout for any tuple trees to which the given tuple belongs.
+    * The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS.
+    * Note that this is an expensive operation, and should be used sparingly.
+    * @param input the tuple to reset timeout for
+    */
     public void resetTimeout(Tuple tuple){
         out.resetTimeout(tuple);
     }