You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/11 18:10:15 UTC
[43/50] [abbrv] git commit: STORM-297: use acked spout to avoid
topology be shutting down before bolt finish.
STORM-297: use acked spout to avoid topology be shutting down before bolt finish.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/23f57242
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/23f57242
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/23f57242
Branch: refs/heads/security
Commit: 23f572420b4b1b24e79b53067c9a2cb703599756
Parents: f7a7802
Author: Sean Zhong <cl...@gmail.com>
Authored: Sun Jun 8 03:41:46 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Sun Jun 8 03:41:46 2014 +0800
----------------------------------------------------------------------
.../storm/testing/TestEventLogSpout.java | 60 +++++++++++++++-----
1 file changed, 47 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/23f57242/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java b/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
index a34484d..1570aeb 100644
--- a/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
+++ b/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
@@ -17,6 +17,7 @@
*/
package backtype.storm.testing;
+import static backtype.storm.utils.Utils.get;
import backtype.storm.topology.OutputFieldsDeclarer;
import java.util.HashMap;
@@ -36,25 +37,40 @@ import backtype.storm.tuple.Values;
public class TestEventLogSpout extends BaseRichSpout {
public static Logger LOG = LoggerFactory.getLogger(TestEventLogSpout.class);
+ private static final Map<String, Integer> acked = new HashMap<String, Integer>();
+ private static final Map<String, Integer> failed = new HashMap<String, Integer>();
+
private String uid;
private long totalCount;
- private final static Map<String, AtomicLong> totalEmitCount = new HashMap<String, AtomicLong>();
SpoutOutputCollector _collector;
private long eventId = 0;
private long myCount;
private int source;
+ public static int getNumAcked(String stormId) {
+ synchronized(acked) {
+ return get(acked, stormId, 0);
+ }
+ }
+
+ public static int getNumFailed(String stormId) {
+ synchronized(failed) {
+ return get(failed, stormId, 0);
+ }
+ }
+
public TestEventLogSpout(long totalCount) {
this.uid = UUID.randomUUID().toString();
- this.totalCount = totalCount;
- synchronized (totalEmitCount) {
- if (null == totalEmitCount.get(uid)) {
- totalEmitCount.put(uid, new AtomicLong(0));
- }
-
+ synchronized(acked) {
+ acked.put(uid, 0);
}
+ synchronized(failed) {
+ failed.put(uid, 0);
+ }
+
+ this.totalCount = totalCount;
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
@@ -69,13 +85,26 @@ public class TestEventLogSpout extends BaseRichSpout {
}
public void cleanup() {
- synchronized(totalEmitCount) {
- totalEmitCount.remove(uid);
+ synchronized(acked) {
+ acked.remove(uid);
+ }
+ synchronized(failed) {
+ failed.remove(uid);
}
}
public boolean completed() {
- Long totalEmitted = totalEmitCount.get(uid).get();
+
+ int ackedAmt;
+ int failedAmt;
+
+ synchronized(acked) {
+ ackedAmt = acked.get(uid);
+ }
+ synchronized(failed) {
+ failedAmt = failed.get(uid);
+ }
+ int totalEmitted = ackedAmt + failedAmt;
if (totalEmitted >= totalCount) {
return true;
@@ -87,16 +116,21 @@ public class TestEventLogSpout extends BaseRichSpout {
if (eventId < myCount) {
eventId++;
_collector.emit(new Values(source, eventId), eventId);
- totalEmitCount.get(uid).incrementAndGet();
}
}
public void ack(Object msgId) {
-
+ synchronized(acked) {
+ int curr = get(acked, uid, 0);
+ acked.put(uid, curr+1);
+ }
}
public void fail(Object msgId) {
-
+ synchronized(failed) {
+ int curr = get(failed, uid, 0);
+ failed.put(uid, curr+1);
+ }
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {