You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/11 18:10:15 UTC

[43/50] [abbrv] git commit: STORM-297: use acked spout to avoid topology be shutting down before bolt finish.

STORM-297: use acked spout to avoid topology be shutting down before bolt finish.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/23f57242
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/23f57242
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/23f57242

Branch: refs/heads/security
Commit: 23f572420b4b1b24e79b53067c9a2cb703599756
Parents: f7a7802
Author: Sean Zhong <cl...@gmail.com>
Authored: Sun Jun 8 03:41:46 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Sun Jun 8 03:41:46 2014 +0800

----------------------------------------------------------------------
 .../storm/testing/TestEventLogSpout.java        | 60 +++++++++++++++-----
 1 file changed, 47 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/23f57242/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java b/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
index a34484d..1570aeb 100644
--- a/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
+++ b/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
@@ -17,6 +17,7 @@
  */
 package backtype.storm.testing;
 
+import static backtype.storm.utils.Utils.get;
 import backtype.storm.topology.OutputFieldsDeclarer;
 
 import java.util.HashMap;
@@ -36,25 +37,40 @@ import backtype.storm.tuple.Values;
 public class TestEventLogSpout extends BaseRichSpout {
     public static Logger LOG = LoggerFactory.getLogger(TestEventLogSpout.class);
     
+    private static final Map<String, Integer> acked = new HashMap<String, Integer>();
+    private static final Map<String, Integer> failed = new HashMap<String, Integer>();
+    
     private String uid;
     private long totalCount;
-    private final static Map<String, AtomicLong> totalEmitCount = new HashMap<String, AtomicLong>();
     
     SpoutOutputCollector _collector;
     private long eventId = 0;
     private long myCount;
     private int source;
     
+    public static int getNumAcked(String stormId) {
+        synchronized(acked) {
+            return get(acked, stormId, 0);
+        }
+    }
+
+    public static int getNumFailed(String stormId) {
+        synchronized(failed) {
+            return get(failed, stormId, 0);
+        }
+    }
+    
     public TestEventLogSpout(long totalCount) {
         this.uid = UUID.randomUUID().toString();
-        this.totalCount = totalCount;
         
-        synchronized (totalEmitCount) {
-            if (null == totalEmitCount.get(uid)) {
-                totalEmitCount.put(uid, new AtomicLong(0));
-            }
-            
+        synchronized(acked) {
+            acked.put(uid, 0);
         }
+        synchronized(failed) {
+            failed.put(uid, 0);
+        }
+        
+        this.totalCount = totalCount;
     }
         
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
@@ -69,13 +85,26 @@ public class TestEventLogSpout extends BaseRichSpout {
     }
     
     public void cleanup() {
-        synchronized(totalEmitCount) {            
-            totalEmitCount.remove(uid);
+        synchronized(acked) {            
+            acked.remove(uid);
+        } 
+        synchronized(failed) {            
+            failed.remove(uid);
         }
     }
     
     public boolean completed() {
-        Long totalEmitted = totalEmitCount.get(uid).get();
+        
+        int ackedAmt;
+        int failedAmt;
+        
+        synchronized(acked) {
+            ackedAmt = acked.get(uid);
+        }
+        synchronized(failed) {
+            failedAmt = failed.get(uid);
+        }
+        int totalEmitted = ackedAmt + failedAmt;
         
         if (totalEmitted >= totalCount) {
             return true;
@@ -87,16 +116,21 @@ public class TestEventLogSpout extends BaseRichSpout {
         if (eventId < myCount) { 
             eventId++;
             _collector.emit(new Values(source, eventId), eventId);
-            totalEmitCount.get(uid).incrementAndGet();
         }        
     }
     
     public void ack(Object msgId) {
-
+        synchronized(acked) {
+            int curr = get(acked, uid, 0);
+            acked.put(uid, curr+1);
+        }
     }
 
     public void fail(Object msgId) {
-        
+        synchronized(failed) {
+            int curr = get(failed, uid, 0);
+            failed.put(uid, curr+1);
+        }
     }
     
     public void declareOutputFields(OutputFieldsDeclarer declarer) {