You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/07/18 04:54:02 UTC
[2/4] storm git commit: Incorporating review comments - pending count
for Trident topologies to return the sum of sizes of buckets in the rotating
map
Incorporating review comments
- pending count for Trident topologies to return the sum of sizes of
buckets in the rotating map
Merge with master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0459bc97
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0459bc97
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0459bc97
Branch: refs/heads/master
Commit: 0459bc97f6e7ea80c3212f9bb5cfdc89a1e5450d
Parents: 3d03b93
Author: Shyam Rajendran <rs...@gmail.com>
Authored: Mon Jul 6 10:28:01 2015 -0500
Committer: Shyam Rajendran <rs...@gmail.com>
Committed: Mon Jul 6 10:42:33 2015 -0500
----------------------------------------------------------------------
.../eventhubs/spout/SpoutOutputCollectorMock.java | 10 +++-------
.../storm/trident/spout/RichSpoutBatchExecutor.java | 14 +++++---------
.../storm/trident/spout/RichSpoutBatchTriggerer.java | 14 +++++++-------
3 files changed, 15 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0459bc97/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
index 9f33c89..df4a3ba 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
@@ -17,18 +17,16 @@
*******************************************************************************/
package org.apache.storm.eventhubs.spout;
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.spout.SpoutOutputCollector;
-
import java.util.List;
+import backtype.storm.spout.ISpoutOutputCollector;
+
/**
* Mock of ISpoutOutputCollector
*/
public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
//comma separated offsets
StringBuilder emittedOffset;
- SpoutOutputCollector _collector;
public SpoutOutputCollectorMock() {
emittedOffset = new StringBuilder();
@@ -63,8 +61,6 @@ public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
@Override
public long getPendingCount() {
- return _collector.getPendingCount();
+ return 0;
}
-
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0459bc97/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
index b81953d..ab9fd4b 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
@@ -24,13 +24,12 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.RotatingMap;
-import storm.trident.operation.TridentCollector;
-import storm.trident.topology.TransactionAttempt;
-import storm.trident.util.TridentUtils;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import storm.trident.operation.TridentCollector;
+import storm.trident.topology.TransactionAttempt;
+import storm.trident.util.TridentUtils;
public class RichSpoutBatchExecutor implements ITridentSpout {
public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";
@@ -82,8 +81,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
idsMap = new RotatingMap(3);
rotateTime = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
}
-
-
+
@Override
public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
long txid = tx.getTransactionId();
@@ -140,8 +138,6 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
}
}
}
-
-
@Override
public void close() {
@@ -198,7 +194,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
public void emitDirect(int task, String stream, List<Object> values, Object id) {
throw new UnsupportedOperationException("Trident does not support direct streams");
}
-
+
@Override
public long getPendingCount() {
return pendingCount;
http://git-wip-us.apache.org/repos/asf/storm/blob/0459bc97/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
index ae6fedf..0380728 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
@@ -27,12 +27,12 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
import java.util.Set;
import storm.trident.topology.TridentBoltExecutor;
import storm.trident.tuple.ConsList;
@@ -173,7 +173,7 @@ public class RichSpoutBatchTriggerer implements IRichSpout {
public void reportError(Throwable t) {
_collector.reportError(t);
}
-
+
@Override
public long getPendingCount() {
return _collector.getPendingCount();