You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/07/18 04:54:01 UTC

[1/4] storm git commit: STORM-67 Provide API for spouts to know how many pending messages there are - changes to classes implementing ISpoutOutputCollector - executor.clj getPendingCount definition

Repository: storm
Updated Branches:
  refs/heads/master 5ac306237 -> 54f6b32f6


STORM-67 Provide API for spouts to know how many pending messages there
are
 - changes to classes implementing ISpoutOutputCollector
 - executor.clj getPendingCount  definition


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3d03b931
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3d03b931
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3d03b931

Branch: refs/heads/master
Commit: 3d03b931b66d43fd5dd383c5e0ac23c55b772232
Parents: 27a3a6b
Author: Shyam Rajendran <sr...@yahoo-inc.com>
Authored: Thu Jun 11 12:38:49 2015 -0500
Committer: Shyam Rajendran <rs...@gmail.com>
Committed: Mon Jul 6 10:42:21 2015 -0500

----------------------------------------------------------------------
 .../spout/SpoutOutputCollectorMock.java         | 13 ++++++++++--
 .../src/clj/backtype/storm/daemon/executor.clj  |  3 +++
 .../storm/spout/ISpoutOutputCollector.java      |  1 +
 .../storm/spout/SpoutOutputCollector.java       |  5 +++++
 .../backtype/storm/testing/SpoutTracker.java    |  6 ++++++
 .../trident/spout/RichSpoutBatchExecutor.java   | 21 ++++++++++++++------
 .../trident/spout/RichSpoutBatchTriggerer.java  | 18 ++++++++++-------
 7 files changed, 52 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
index 02e6830..9f33c89 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
@@ -17,9 +17,10 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.spout;
 
-import java.util.List;
-
 import backtype.storm.spout.ISpoutOutputCollector;
+import backtype.storm.spout.SpoutOutputCollector;
+
+import java.util.List;
 
 /**
  * Mock of ISpoutOutputCollector
@@ -27,6 +28,7 @@ import backtype.storm.spout.ISpoutOutputCollector;
 public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
   //comma separated offsets
   StringBuilder emittedOffset;
+  SpoutOutputCollector _collector;
   
   public SpoutOutputCollectorMock() {
     emittedOffset = new StringBuilder();
@@ -58,4 +60,11 @@ public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
   @Override
   public void reportError(Throwable arg0) {
   }
+
+  @Override
+  public long getPendingCount() {
+    return _collector.getPendingCount();
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 454fd0d..4f5cc75 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -558,6 +558,9 @@
                  (:user-context task-data)
                  (SpoutOutputCollector.
                   (reify ISpoutOutputCollector
+                    (^long getPendingCount[this]
+                      (.size pending)
+                      )
                     (^List emit [this ^String stream-id ^List tuple ^Object message-id]
                       (send-spout-msg stream-id tuple message-id nil)
                       )

http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java b/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
index 3cebe43..26a4843 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
@@ -26,5 +26,6 @@ public interface ISpoutOutputCollector {
     List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
     void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
     void reportError(Throwable error);
+    long getPendingCount();
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java b/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java
index 7a33026..f23692b 100644
--- a/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java
+++ b/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java
@@ -131,4 +131,9 @@ public class SpoutOutputCollector implements ISpoutOutputCollector {
     public void reportError(Throwable error) {
         _delegate.reportError(error);
     }
+
+    @Override
+    public long getPendingCount() {
+        return _delegate.getPendingCount();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java b/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java
index 75ba2b8..c4b5ff1 100644
--- a/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java
+++ b/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java
@@ -65,6 +65,12 @@ public class SpoutTracker extends BaseRichSpout {
         public void reportError(Throwable error) {
         	_collector.reportError(error);
         }
+
+        @Override
+        public long getPendingCount() {
+            return _collector.getPendingCount();
+        }
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
index 345a5a0..b81953d 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
@@ -24,13 +24,14 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.RotatingMap;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import storm.trident.operation.TridentCollector;
 import storm.trident.topology.TransactionAttempt;
 import storm.trident.util.TridentUtils;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 public class RichSpoutBatchExecutor implements ITridentSpout {
     public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";
 
@@ -81,7 +82,8 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
             idsMap = new RotatingMap(3);
             rotateTime = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
         }
-        
+
+
         @Override
         public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
             long txid = tx.getTransactionId();
@@ -112,6 +114,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
                 }
             }
             idsMap.put(txid, _collector.ids);
+            _collector.pendingCount = idsMap.size();
 
         }
 
@@ -137,6 +140,8 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
                 }
             }
         }
+
+
         
         @Override
         public void close() {
@@ -170,7 +175,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
         TridentCollector _collector;
         public List<Object> ids;
         public int numEmitted;
-        
+        public long pendingCount;
         public void reset(TridentCollector c) {
             _collector = c;
             ids = new ArrayList<Object>();
@@ -193,7 +198,11 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
         public void emitDirect(int task, String stream, List<Object> values, Object id) {
             throw new UnsupportedOperationException("Trident does not support direct streams");
         }
-        
+
+        @Override
+        public long getPendingCount() {
+            return pendingCount;
+        }
     }
     
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
index 728d51e..ae6fedf 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
@@ -27,12 +27,12 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.ArrayList;		
+import java.util.HashMap;		
+import java.util.HashSet;		
+import java.util.List;		
+import java.util.Map;		
+import java.util.Random;		
 import java.util.Set;
 import storm.trident.topology.TridentBoltExecutor;
 import storm.trident.tuple.ConsList;
@@ -173,6 +173,10 @@ public class RichSpoutBatchTriggerer implements IRichSpout {
         public void reportError(Throwable t) {
             _collector.reportError(t);
         }
-        
+
+        @Override
+        public long getPendingCount() {
+            return _collector.getPendingCount();
+        }
     }
 }


[3/4] storm git commit: Merge branch 'STORM-67' of https://github.com/bourneagain/storm into STORM-67

Posted by ka...@apache.org.
Merge branch 'STORM-67' of https://github.com/bourneagain/storm into STORM-67

Conflicts:
	storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fd219375
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fd219375
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fd219375

Branch: refs/heads/master
Commit: fd21937526b4b20ac1670c623645ba131ed512ac
Parents: 5ac3062 0459bc9
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Jul 18 11:51:15 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Jul 18 11:51:15 2015 +0900

----------------------------------------------------------------------
 .../storm/eventhubs/spout/SpoutOutputCollectorMock.java       | 5 +++++
 storm-core/src/clj/backtype/storm/daemon/executor.clj         | 3 +++
 .../src/jvm/backtype/storm/spout/ISpoutOutputCollector.java   | 1 +
 .../src/jvm/backtype/storm/spout/SpoutOutputCollector.java    | 5 +++++
 storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java   | 6 ++++++
 .../src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java   | 7 ++++++-
 .../src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java  | 4 ++++
 7 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fd219375/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/fd219375/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
index a7965ee,26a4843..709ae2a
--- a/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
@@@ -27,5 -25,7 +27,6 @@@ public interface ISpoutOutputCollector 
      */
      List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
      void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
 -    void reportError(Throwable error);
+     long getPendingCount();
  }
  


[2/4] storm git commit: Incorporating review comments - pending count for Trident topologies to return the sum of sizes of buckets in the rotating map

Posted by ka...@apache.org.
Incorporating review comments
- pending count for Trident topologies to return the sum of sizes of
  buckets in the rotating map

Merge with master


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0459bc97
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0459bc97
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0459bc97

Branch: refs/heads/master
Commit: 0459bc97f6e7ea80c3212f9bb5cfdc89a1e5450d
Parents: 3d03b93
Author: Shyam Rajendran <rs...@gmail.com>
Authored: Mon Jul 6 10:28:01 2015 -0500
Committer: Shyam Rajendran <rs...@gmail.com>
Committed: Mon Jul 6 10:42:33 2015 -0500

----------------------------------------------------------------------
 .../eventhubs/spout/SpoutOutputCollectorMock.java     | 10 +++-------
 .../storm/trident/spout/RichSpoutBatchExecutor.java   | 14 +++++---------
 .../storm/trident/spout/RichSpoutBatchTriggerer.java  | 14 +++++++-------
 3 files changed, 15 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0459bc97/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
index 9f33c89..df4a3ba 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
@@ -17,18 +17,16 @@
  *******************************************************************************/
 package org.apache.storm.eventhubs.spout;
 
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.spout.SpoutOutputCollector;
-
 import java.util.List;
 
+import backtype.storm.spout.ISpoutOutputCollector;
+
 /**
  * Mock of ISpoutOutputCollector
  */
 public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
   //comma separated offsets
   StringBuilder emittedOffset;
-  SpoutOutputCollector _collector;
   
   public SpoutOutputCollectorMock() {
     emittedOffset = new StringBuilder();
@@ -63,8 +61,6 @@ public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
 
   @Override
   public long getPendingCount() {
-    return _collector.getPendingCount();
+    return 0;
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0459bc97/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
index b81953d..ab9fd4b 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
@@ -24,13 +24,12 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.RotatingMap;
-import storm.trident.operation.TridentCollector;
-import storm.trident.topology.TransactionAttempt;
-import storm.trident.util.TridentUtils;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import storm.trident.operation.TridentCollector;
+import storm.trident.topology.TransactionAttempt;
+import storm.trident.util.TridentUtils;
 
 public class RichSpoutBatchExecutor implements ITridentSpout {
     public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";
@@ -82,8 +81,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
             idsMap = new RotatingMap(3);
             rotateTime = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
         }
-
-
+        
         @Override
         public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
             long txid = tx.getTransactionId();
@@ -140,8 +138,6 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
                 }
             }
         }
-
-
         
         @Override
         public void close() {
@@ -198,7 +194,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
         public void emitDirect(int task, String stream, List<Object> values, Object id) {
             throw new UnsupportedOperationException("Trident does not support direct streams");
         }
-
+        
         @Override
         public long getPendingCount() {
             return pendingCount;

http://git-wip-us.apache.org/repos/asf/storm/blob/0459bc97/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
index ae6fedf..0380728 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
@@ -27,12 +27,12 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
-import java.util.ArrayList;		
-import java.util.HashMap;		
-import java.util.HashSet;		
-import java.util.List;		
-import java.util.Map;		
-import java.util.Random;		
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import storm.trident.topology.TridentBoltExecutor;
 import storm.trident.tuple.ConsList;
@@ -173,7 +173,7 @@ public class RichSpoutBatchTriggerer implements IRichSpout {
         public void reportError(Throwable t) {
             _collector.reportError(t);
         }
-
+        
         @Override
         public long getPendingCount() {
             return _collector.getPendingCount();


[4/4] storm git commit: add STORM-67 to CHANGELOG.md

Posted by ka...@apache.org.
add STORM-67 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/54f6b32f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/54f6b32f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/54f6b32f

Branch: refs/heads/master
Commit: 54f6b32f6bcf77032db5aa29183426bfbe3c0a90
Parents: fd21937
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Jul 18 11:53:43 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Jul 18 11:53:43 2015 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/54f6b32f/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3576cff..570f513 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,13 +11,14 @@
  * STORM-934: The current doc for topology ackers is outdated
  * STORM-160: Allow ShellBolt to set env vars (particularly PATH)
  * STORM-937: Changing noisy log level from info to debug
- * STORM-931: Python Scritps to Produce Formatted JIRA and GitHub Join
+ * STORM-931: Python Scripts to Produce Formatted JIRA and GitHub Join
  * STORM-924: Set the file mode of the files included when packaging release packages
  * STORM-799: Use IErrorReport interface more broadly
  * STORM-926: change pom to use maven-shade-plugin:2.2
  * STORM-793: Made change to logviewer.clj in order to remove the invalid http 500 response
  * STORM-857: create logs metadata dir when running securely
  * STORM-942: Add FluxParser method parseInputStream() to eliminate disk usage
+ * STORM-67: Provide API for spouts to know how many pending messages there are
 
 ## 0.10.0-beta2
  * STORM-843: [storm-redis] Add Javadoc to storm-redis