You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/06/21 19:58:14 UTC
[1/3] storm git commit: STORM-1907: PartitionedTridentSpoutExecutor
has incompatible types that cause ClassCastException - Generify on Object
rather than Integer because example is broken - Fix TridentWordCount example
to print more meaningful output
Repository: storm
Updated Branches:
refs/heads/master 6e67ea7a3 -> 6bf7890b8
STORM-1907: PartitionedTridentSpoutExecutor has incompatible types that cause ClassCastException
- Generify on Object rather than Integer because example is broken
- Fix TridentWordCount example to print more meaningful output
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aae0acbe
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aae0acbe
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aae0acbe
Branch: refs/heads/master
Commit: aae0acbe8206f66129443d78831883226f8144c9
Parents: 8ea9e5b
Author: Hugo Louro <hm...@gmail.com>
Authored: Wed Jun 15 17:49:30 2016 -0700
Committer: Hugo Louro <hm...@gmail.com>
Committed: Wed Jun 15 21:41:52 2016 -0700
----------------------------------------------------------------------
.../storm/starter/trident/TridentWordCount.java | 13 ++++++------
.../spout/PartitionedTridentSpoutExecutor.java | 21 ++++++++++----------
2 files changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/aae0acbe/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
index 93ccf18..ac46474 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
@@ -22,8 +22,6 @@ import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
@@ -31,10 +29,11 @@ import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.builtin.Count;
import org.apache.storm.trident.operation.builtin.FilterNull;
import org.apache.storm.trident.operation.builtin.MapGet;
-import org.apache.storm.trident.operation.builtin.Sum;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.testing.MemoryMapState;
import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
public class TridentWordCount {
@@ -59,9 +58,11 @@ public class TridentWordCount {
new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
new Count(), new Fields("count")).parallelismHint(16);
- topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields(
- "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"),
- new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));
+ topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word"))
+ .groupBy(new Fields("word"))
+ .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
+ .each(new Fields("count"), new FilterNull())
+ .project(new Fields("word", "count"));
return topology.build();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/aae0acbe/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java
index 389a929..dc9b3d9 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java
@@ -18,30 +18,31 @@
package org.apache.storm.trident.spout;
import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.topology.TransactionAttempt;
+import org.apache.storm.trident.topology.state.RotatingTransactionalState;
+import org.apache.storm.trident.topology.state.TransactionalState;
import org.apache.storm.tuple.Fields;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.topology.TransactionAttempt;
-import org.apache.storm.trident.topology.state.RotatingTransactionalState;
-import org.apache.storm.trident.topology.state.TransactionalState;
public class PartitionedTridentSpoutExecutor implements ITridentSpout<Object> {
- IPartitionedTridentSpout<Integer, ISpoutPartition, Object> _spout;
+ IPartitionedTridentSpout<Object, ISpoutPartition, Object> _spout;
- public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout<Integer, ISpoutPartition, Object> spout) {
+ public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout<Object, ISpoutPartition, Object> spout) {
_spout = spout;
}
- public IPartitionedTridentSpout<Integer, ISpoutPartition, Object> getPartitionedSpout() {
+ public IPartitionedTridentSpout<Object, ISpoutPartition, Object> getPartitionedSpout() {
return _spout;
}
class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
- private IPartitionedTridentSpout.Coordinator<Integer> _coordinator;
+ private IPartitionedTridentSpout.Coordinator<Object> _coordinator;
public Coordinator(Map conf, TopologyContext context) {
_coordinator = _spout.getCoordinator(conf, context);
@@ -83,7 +84,7 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout<Object> {
}
class Emitter implements ITridentSpout.Emitter<Object> {
- private IPartitionedTridentSpout.Emitter _emitter;
+ private IPartitionedTridentSpout.Emitter<Object, ISpoutPartition, Object> _emitter;
private TransactionalState _state;
private Map<String, EmitterPartitionState> _partitionStates = new HashMap<>();
private int _index;
@@ -150,7 +151,7 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout<Object> {
}
@Override
- public ITridentSpout.BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
+ public ITridentSpout.BatchCoordinator<Object> getCoordinator(String txStateId, Map conf, TopologyContext context) {
return new Coordinator(conf, context);
}
[3/3] storm git commit: Added STORM-1907 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-1907 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6bf7890b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6bf7890b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6bf7890b
Branch: refs/heads/master
Commit: 6bf7890b8956ab982a1ff1bbfe94d8b622ac13a1
Parents: b16a750
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Jun 21 12:57:23 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Jun 21 12:57:23 2016 -0700
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6bf7890b/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f4e758e..8c34d64 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -107,6 +107,7 @@
* STORM-1769: Added a test to check local nimbus with notifier plugin
## 1.1.0
+ * STORM-1907: PartitionedTridentSpoutExecutor has incompatible types that cause ClassCastException
* STORM-1136: Command line module to return kafka spout offsets lag and display in storm ui.
* STORM-1911: IClusterMetricsConsumer should use seconds to timestamp unit
* STORM-1849: HDFSFileTopology should use the 3rd argument as topologyName
[2/3] storm git commit: Merge branch 'master_STORM-1907' of
https://github.com/hmcl/storm-apache into STORM-1907
Posted by sr...@apache.org.
Merge branch 'master_STORM-1907' of https://github.com/hmcl/storm-apache into STORM-1907
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b16a7507
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b16a7507
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b16a7507
Branch: refs/heads/master
Commit: b16a75070706f01adc571d09f7606f1311d1e7e9
Parents: 6e67ea7 aae0acb
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Jun 21 11:41:07 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Jun 21 11:41:07 2016 -0700
----------------------------------------------------------------------
.../storm/starter/trident/TridentWordCount.java | 13 ++++++------
.../spout/PartitionedTridentSpoutExecutor.java | 21 ++++++++++----------
2 files changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------