You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/07/18 04:54:01 UTC
[1/4] storm git commit: STORM-67 Provide API for spouts to know how
many pending messages there are - changes to classes implementing
ISpoutOutputCollector - executor.clj getPendingCount definition
Repository: storm
Updated Branches:
refs/heads/master 5ac306237 -> 54f6b32f6
STORM-67 Provide API for spouts to know how many pending messages there
are
- changes to classes implementing ISpoutOutputCollector
- executor.clj getPendingCount definition
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3d03b931
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3d03b931
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3d03b931
Branch: refs/heads/master
Commit: 3d03b931b66d43fd5dd383c5e0ac23c55b772232
Parents: 27a3a6b
Author: Shyam Rajendran <sr...@yahoo-inc.com>
Authored: Thu Jun 11 12:38:49 2015 -0500
Committer: Shyam Rajendran <rs...@gmail.com>
Committed: Mon Jul 6 10:42:21 2015 -0500
----------------------------------------------------------------------
.../spout/SpoutOutputCollectorMock.java | 13 ++++++++++--
.../src/clj/backtype/storm/daemon/executor.clj | 3 +++
.../storm/spout/ISpoutOutputCollector.java | 1 +
.../storm/spout/SpoutOutputCollector.java | 5 +++++
.../backtype/storm/testing/SpoutTracker.java | 6 ++++++
.../trident/spout/RichSpoutBatchExecutor.java | 21 ++++++++++++++------
.../trident/spout/RichSpoutBatchTriggerer.java | 18 ++++++++++-------
7 files changed, 52 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
index 02e6830..9f33c89 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
@@ -17,9 +17,10 @@
*******************************************************************************/
package org.apache.storm.eventhubs.spout;
-import java.util.List;
-
import backtype.storm.spout.ISpoutOutputCollector;
+import backtype.storm.spout.SpoutOutputCollector;
+
+import java.util.List;
/**
* Mock of ISpoutOutputCollector
@@ -27,6 +28,7 @@ import backtype.storm.spout.ISpoutOutputCollector;
public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
//comma separated offsets
StringBuilder emittedOffset;
+ SpoutOutputCollector _collector;
public SpoutOutputCollectorMock() {
emittedOffset = new StringBuilder();
@@ -58,4 +60,11 @@ public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
@Override
public void reportError(Throwable arg0) {
}
+
+ @Override
+ public long getPendingCount() {
+ return _collector.getPendingCount();
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 454fd0d..4f5cc75 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -558,6 +558,9 @@
(:user-context task-data)
(SpoutOutputCollector.
(reify ISpoutOutputCollector
+ (^long getPendingCount[this]
+ (.size pending)
+ )
(^List emit [this ^String stream-id ^List tuple ^Object message-id]
(send-spout-msg stream-id tuple message-id nil)
)
http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java b/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
index 3cebe43..26a4843 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
@@ -26,5 +26,6 @@ public interface ISpoutOutputCollector {
List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
void reportError(Throwable error);
+ long getPendingCount();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java b/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java
index 7a33026..f23692b 100644
--- a/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java
+++ b/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java
@@ -131,4 +131,9 @@ public class SpoutOutputCollector implements ISpoutOutputCollector {
public void reportError(Throwable error) {
_delegate.reportError(error);
}
+
+ @Override
+ public long getPendingCount() {
+ return _delegate.getPendingCount();
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java b/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java
index 75ba2b8..c4b5ff1 100644
--- a/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java
+++ b/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java
@@ -65,6 +65,12 @@ public class SpoutTracker extends BaseRichSpout {
public void reportError(Throwable error) {
_collector.reportError(error);
}
+
+ @Override
+ public long getPendingCount() {
+ return _collector.getPendingCount();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
index 345a5a0..b81953d 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
@@ -24,13 +24,14 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.RotatingMap;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.topology.TransactionAttempt;
import storm.trident.util.TridentUtils;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
public class RichSpoutBatchExecutor implements ITridentSpout {
public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";
@@ -81,7 +82,8 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
idsMap = new RotatingMap(3);
rotateTime = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
}
-
+
+
@Override
public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
long txid = tx.getTransactionId();
@@ -112,6 +114,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
}
}
idsMap.put(txid, _collector.ids);
+ _collector.pendingCount = idsMap.size();
}
@@ -137,6 +140,8 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
}
}
}
+
+
@Override
public void close() {
@@ -170,7 +175,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
TridentCollector _collector;
public List<Object> ids;
public int numEmitted;
-
+ public long pendingCount;
public void reset(TridentCollector c) {
_collector = c;
ids = new ArrayList<Object>();
@@ -193,7 +198,11 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
public void emitDirect(int task, String stream, List<Object> values, Object id) {
throw new UnsupportedOperationException("Trident does not support direct streams");
}
-
+
+ @Override
+ public long getPendingCount() {
+ return pendingCount;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
index 728d51e..ae6fedf 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
@@ -27,12 +27,12 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
import java.util.Set;
import storm.trident.topology.TridentBoltExecutor;
import storm.trident.tuple.ConsList;
@@ -173,6 +173,10 @@ public class RichSpoutBatchTriggerer implements IRichSpout {
public void reportError(Throwable t) {
_collector.reportError(t);
}
-
+
+ @Override
+ public long getPendingCount() {
+ return _collector.getPendingCount();
+ }
}
}
[3/4] storm git commit: Merge branch 'STORM-67' of
https://github.com/bourneagain/storm into STORM-67
Posted by ka...@apache.org.
Merge branch 'STORM-67' of https://github.com/bourneagain/storm into STORM-67
Conflicts:
storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fd219375
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fd219375
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fd219375
Branch: refs/heads/master
Commit: fd21937526b4b20ac1670c623645ba131ed512ac
Parents: 5ac3062 0459bc9
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Jul 18 11:51:15 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Jul 18 11:51:15 2015 +0900
----------------------------------------------------------------------
.../storm/eventhubs/spout/SpoutOutputCollectorMock.java | 5 +++++
storm-core/src/clj/backtype/storm/daemon/executor.clj | 3 +++
.../src/jvm/backtype/storm/spout/ISpoutOutputCollector.java | 1 +
.../src/jvm/backtype/storm/spout/SpoutOutputCollector.java | 5 +++++
storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java | 6 ++++++
.../src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java | 7 ++++++-
.../src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java | 4 ++++
7 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fd219375/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fd219375/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
index a7965ee,26a4843..709ae2a
--- a/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
@@@ -27,5 -25,7 +27,6 @@@ public interface ISpoutOutputCollector
*/
List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
- void reportError(Throwable error);
+ long getPendingCount();
}
[2/4] storm git commit: Incorporating review comments - pending count
for Trident topologies to return the sum of sizes of buckets in the rotating
map
Posted by ka...@apache.org.
Incorporating review comments
- pending count for Trident topologies to return the sum of sizes of
buckets in the rotating map
Merge with master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0459bc97
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0459bc97
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0459bc97
Branch: refs/heads/master
Commit: 0459bc97f6e7ea80c3212f9bb5cfdc89a1e5450d
Parents: 3d03b93
Author: Shyam Rajendran <rs...@gmail.com>
Authored: Mon Jul 6 10:28:01 2015 -0500
Committer: Shyam Rajendran <rs...@gmail.com>
Committed: Mon Jul 6 10:42:33 2015 -0500
----------------------------------------------------------------------
.../eventhubs/spout/SpoutOutputCollectorMock.java | 10 +++-------
.../storm/trident/spout/RichSpoutBatchExecutor.java | 14 +++++---------
.../storm/trident/spout/RichSpoutBatchTriggerer.java | 14 +++++++-------
3 files changed, 15 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0459bc97/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
index 9f33c89..df4a3ba 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
@@ -17,18 +17,16 @@
*******************************************************************************/
package org.apache.storm.eventhubs.spout;
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.spout.SpoutOutputCollector;
-
import java.util.List;
+import backtype.storm.spout.ISpoutOutputCollector;
+
/**
* Mock of ISpoutOutputCollector
*/
public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
//comma separated offsets
StringBuilder emittedOffset;
- SpoutOutputCollector _collector;
public SpoutOutputCollectorMock() {
emittedOffset = new StringBuilder();
@@ -63,8 +61,6 @@ public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
@Override
public long getPendingCount() {
- return _collector.getPendingCount();
+ return 0;
}
-
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0459bc97/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
index b81953d..ab9fd4b 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
@@ -24,13 +24,12 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.RotatingMap;
-import storm.trident.operation.TridentCollector;
-import storm.trident.topology.TransactionAttempt;
-import storm.trident.util.TridentUtils;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import storm.trident.operation.TridentCollector;
+import storm.trident.topology.TransactionAttempt;
+import storm.trident.util.TridentUtils;
public class RichSpoutBatchExecutor implements ITridentSpout {
public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";
@@ -82,8 +81,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
idsMap = new RotatingMap(3);
rotateTime = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
}
-
-
+
@Override
public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
long txid = tx.getTransactionId();
@@ -140,8 +138,6 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
}
}
}
-
-
@Override
public void close() {
@@ -198,7 +194,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
public void emitDirect(int task, String stream, List<Object> values, Object id) {
throw new UnsupportedOperationException("Trident does not support direct streams");
}
-
+
@Override
public long getPendingCount() {
return pendingCount;
http://git-wip-us.apache.org/repos/asf/storm/blob/0459bc97/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
index ae6fedf..0380728 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
@@ -27,12 +27,12 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
import java.util.Set;
import storm.trident.topology.TridentBoltExecutor;
import storm.trident.tuple.ConsList;
@@ -173,7 +173,7 @@ public class RichSpoutBatchTriggerer implements IRichSpout {
public void reportError(Throwable t) {
_collector.reportError(t);
}
-
+
@Override
public long getPendingCount() {
return _collector.getPendingCount();
[4/4] storm git commit: add STORM-67 to CHANGELOG.md
Posted by ka...@apache.org.
add STORM-67 to CHANGELOG.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/54f6b32f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/54f6b32f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/54f6b32f
Branch: refs/heads/master
Commit: 54f6b32f6bcf77032db5aa29183426bfbe3c0a90
Parents: fd21937
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Jul 18 11:53:43 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Jul 18 11:53:43 2015 +0900
----------------------------------------------------------------------
CHANGELOG.md | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/54f6b32f/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3576cff..570f513 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,13 +11,14 @@
* STORM-934: The current doc for topology ackers is outdated
* STORM-160: Allow ShellBolt to set env vars (particularly PATH)
* STORM-937: Changing noisy log level from info to debug
- * STORM-931: Python Scritps to Produce Formatted JIRA and GitHub Join
+ * STORM-931: Python Scripts to Produce Formatted JIRA and GitHub Join
* STORM-924: Set the file mode of the files included when packaging release packages
* STORM-799: Use IErrorReport interface more broadly
* STORM-926: change pom to use maven-shade-plugin:2.2
* STORM-793: Made change to logviewer.clj in order to remove the invalid http 500 response
* STORM-857: create logs metadata dir when running securely
* STORM-942: Add FluxParser method parseInputStream() to eliminate disk usage
+ * STORM-67: Provide API for spouts to know how many pending messages there are
## 0.10.0-beta2
* STORM-843: [storm-redis] Add Javadoc to storm-redis