You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by lo...@apache.org on 2016/03/01 00:58:36 UTC

[13/13] storm git commit: Send failure response to spout instead of doing nothing, for the case that acker receives FAIL before INIT

Send failure response to spout instead of doing nothing, for the case that acker receives FAIL before INIT


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c8138dc7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c8138dc7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c8138dc7

Branch: refs/heads/master
Commit: c8138dc72d4f39fe1ebb09c42931d6cbd3198c7e
Parents: 6e433c8
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Sun Feb 28 21:50:09 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Sun Feb 28 21:50:09 2016 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/org/apache/storm/daemon/Acker.java | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c8138dc7/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
index 98f73df..7d05e24 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
@@ -94,18 +94,19 @@ public class Acker implements IBolt {
                 pending.put(id, curr);
             }
         } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
+            // For the case that ack_fail message arrives before ack_init
             if (curr == null) {
-                // The tuple has been already timeout or failed. So, do nothing
-                return;
+                curr = new AckObject();
             }
             curr.failed = true;
+            pending.put(id, curr);
         } else {
             LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
             return;
         }
 
         Integer task = curr.spoutTask;
-        if (task != null) {
+        if (curr != null && task != null) {
             if (curr.val == 0) {
                 pending.remove(id);
                 collector.emitDirect(task, ACKER_ACK_STREAM_ID, new Values(id));