You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/16 15:36:54 UTC
[2/7] storm git commit: Add a missing space, fix potential NPE,
add comment to javadoc about reset timeout being expensive
Add a missing space, fix potential NPE, add comment to javadoc about reset timeout being expensive
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bc79b4a8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bc79b4a8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bc79b4a8
Branch: refs/heads/master
Commit: bc79b4a8d757a3191a85815877345d38710c73e2
Parents: d36be51
Author: Stig Døssing <st...@gmail.com>
Authored: Wed Mar 2 17:58:01 2016 +0100
Committer: Stig Døssing <st...@gmail.com>
Committed: Wed Mar 2 17:59:14 2016 +0100
----------------------------------------------------------------------
storm-core/src/jvm/org/apache/storm/daemon/Acker.java | 5 ++++-
storm-core/src/jvm/org/apache/storm/task/OutputCollector.java | 1 +
.../jvm/org/apache/storm/topology/BasicOutputCollector.java | 6 ++++++
3 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bc79b4a8/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
index eb14af7..d7b9a2e 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
@@ -101,7 +101,10 @@ public class Acker implements IBolt {
}
curr.failed = true;
pending.put(id, curr);
- } else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+ } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+ if (curr == null) {
+ curr = new AckObject();
+ }
pending.put(id, curr);
} else {
LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
http://git-wip-us.apache.org/repos/asf/storm/blob/bc79b4a8/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
index 071d8aa..4db87f0 100644
--- a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
@@ -221,6 +221,7 @@ public class OutputCollector implements IOutputCollector {
/**
* Resets the message timeout for any tuple trees to which the given tuple belongs.
* The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS.
+ * Note that this is an expensive operation, and should be used sparingly.
* @param input the tuple to reset timeout for
*/
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/bc79b4a8/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
index 343c349..1d1e5ff 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
@@ -52,6 +52,12 @@ public class BasicOutputCollector implements IBasicOutputCollector {
emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
}
+ /**
+ * Resets the message timeout for any tuple trees to which the given tuple belongs.
+ * The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS.
+ * Note that this is an expensive operation, and should be used sparingly.
+ * @param input the tuple to reset timeout for
+ */
public void resetTimeout(Tuple tuple){
out.resetTimeout(tuple);
}