You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/07/18 04:54:02 UTC

[2/4] storm git commit: Incorporating review comments - pending count for Trident topologies to return the sum of sizes of buckets in the rotating map

Incorporating review comments
- pending count for Trident topologies to return the sum of sizes of
  buckets in the rotating map

Merge with master


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0459bc97
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0459bc97
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0459bc97

Branch: refs/heads/master
Commit: 0459bc97f6e7ea80c3212f9bb5cfdc89a1e5450d
Parents: 3d03b93
Author: Shyam Rajendran <rs...@gmail.com>
Authored: Mon Jul 6 10:28:01 2015 -0500
Committer: Shyam Rajendran <rs...@gmail.com>
Committed: Mon Jul 6 10:42:33 2015 -0500

----------------------------------------------------------------------
 .../eventhubs/spout/SpoutOutputCollectorMock.java     | 10 +++-------
 .../storm/trident/spout/RichSpoutBatchExecutor.java   | 14 +++++---------
 .../storm/trident/spout/RichSpoutBatchTriggerer.java  | 14 +++++++-------
 3 files changed, 15 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0459bc97/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
index 9f33c89..df4a3ba 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
@@ -17,18 +17,16 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.spout;
 
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.spout.SpoutOutputCollector;
-
 import java.util.List;
 
+import backtype.storm.spout.ISpoutOutputCollector;
+
 /**
  * Mock of ISpoutOutputCollector
  */
 public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
   //comma separated offsets
   StringBuilder emittedOffset;
-  SpoutOutputCollector _collector;
   
   public SpoutOutputCollectorMock() {
     emittedOffset = new StringBuilder();
@@ -63,8 +61,6 @@ public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
 
   @Override
   public long getPendingCount() {
-    return _collector.getPendingCount();
+    return 0;
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0459bc97/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
index b81953d..ab9fd4b 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
@@ -24,13 +24,12 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.RotatingMap;
-import storm.trident.operation.TridentCollector;
-import storm.trident.topology.TransactionAttempt;
-import storm.trident.util.TridentUtils;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import storm.trident.operation.TridentCollector;
+import storm.trident.topology.TransactionAttempt;
+import storm.trident.util.TridentUtils;
 
 public class RichSpoutBatchExecutor implements ITridentSpout {
     public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";
@@ -82,8 +81,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
             idsMap = new RotatingMap(3);
             rotateTime = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
         }
-
-
+        
         @Override
         public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
             long txid = tx.getTransactionId();
@@ -140,8 +138,6 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
                 }
             }
         }
-
-
         
         @Override
         public void close() {
@@ -198,7 +194,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
         public void emitDirect(int task, String stream, List<Object> values, Object id) {
             throw new UnsupportedOperationException("Trident does not support direct streams");
         }
-
+        
         @Override
         public long getPendingCount() {
             return pendingCount;

http://git-wip-us.apache.org/repos/asf/storm/blob/0459bc97/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
index ae6fedf..0380728 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
@@ -27,12 +27,12 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
-import java.util.ArrayList;		
-import java.util.HashMap;		
-import java.util.HashSet;		
-import java.util.List;		
-import java.util.Map;		
-import java.util.Random;		
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import storm.trident.topology.TridentBoltExecutor;
 import storm.trident.tuple.ConsList;
@@ -173,7 +173,7 @@ public class RichSpoutBatchTriggerer implements IRichSpout {
         public void reportError(Throwable t) {
             _collector.reportError(t);
         }
-
+        
         @Override
         public long getPendingCount() {
             return _collector.getPendingCount();