You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/06/21 19:58:14 UTC

[1/3] storm git commit: STORM-1907: PartitionedTridentSpoutExecutor has incompatible types that cause ClassCastException - Generify on Object rather than Integer because example is broken - Fix TridentWordCount example to print more meaningful output

Repository: storm
Updated Branches:
  refs/heads/master 6e67ea7a3 -> 6bf7890b8


STORM-1907: PartitionedTridentSpoutExecutor has incompatible types that cause ClassCastException
 - Generify on Object rather than Integer because example is broken
 - Fix TridentWordCount example to print more meaningful output


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aae0acbe
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aae0acbe
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aae0acbe

Branch: refs/heads/master
Commit: aae0acbe8206f66129443d78831883226f8144c9
Parents: 8ea9e5b
Author: Hugo Louro <hm...@gmail.com>
Authored: Wed Jun 15 17:49:30 2016 -0700
Committer: Hugo Louro <hm...@gmail.com>
Committed: Wed Jun 15 21:41:52 2016 -0700

----------------------------------------------------------------------
 .../storm/starter/trident/TridentWordCount.java | 13 ++++++------
 .../spout/PartitionedTridentSpoutExecutor.java  | 21 ++++++++++----------
 2 files changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/aae0acbe/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
index 93ccf18..ac46474 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
@@ -22,8 +22,6 @@ import org.apache.storm.LocalCluster;
 import org.apache.storm.LocalDRPC;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.operation.BaseFunction;
@@ -31,10 +29,11 @@ import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.operation.builtin.Count;
 import org.apache.storm.trident.operation.builtin.FilterNull;
 import org.apache.storm.trident.operation.builtin.MapGet;
-import org.apache.storm.trident.operation.builtin.Sum;
 import org.apache.storm.trident.testing.FixedBatchSpout;
 import org.apache.storm.trident.testing.MemoryMapState;
 import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 
 public class TridentWordCount {
@@ -59,9 +58,11 @@ public class TridentWordCount {
         new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
         new Count(), new Fields("count")).parallelismHint(16);
 
-    topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields(
-        "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"),
-        new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));
+    topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word"))
+            .groupBy(new Fields("word"))
+            .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
+            .each(new Fields("count"), new FilterNull())
+            .project(new Fields("word", "count"));
     return topology.build();
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/aae0acbe/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java
index 389a929..dc9b3d9 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java
@@ -18,30 +18,31 @@
 package org.apache.storm.trident.spout;
 
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.topology.TransactionAttempt;
+import org.apache.storm.trident.topology.state.RotatingTransactionalState;
+import org.apache.storm.trident.topology.state.TransactionalState;
 import org.apache.storm.tuple.Fields;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.topology.TransactionAttempt;
-import org.apache.storm.trident.topology.state.RotatingTransactionalState;
-import org.apache.storm.trident.topology.state.TransactionalState;
 
 
 public class PartitionedTridentSpoutExecutor implements ITridentSpout<Object> {
-    IPartitionedTridentSpout<Integer, ISpoutPartition, Object> _spout;
+    IPartitionedTridentSpout<Object, ISpoutPartition, Object> _spout;
     
-    public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout<Integer, ISpoutPartition, Object> spout) {
+    public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout<Object, ISpoutPartition, Object> spout) {
         _spout = spout;
     }
     
-    public IPartitionedTridentSpout<Integer, ISpoutPartition, Object> getPartitionedSpout() {
+    public IPartitionedTridentSpout<Object, ISpoutPartition, Object> getPartitionedSpout() {
         return _spout;
     }
     
     class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
-        private IPartitionedTridentSpout.Coordinator<Integer> _coordinator;
+        private IPartitionedTridentSpout.Coordinator<Object> _coordinator;
         
         public Coordinator(Map conf, TopologyContext context) {
             _coordinator = _spout.getCoordinator(conf, context);
@@ -83,7 +84,7 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout<Object> {
     }
     
     class Emitter implements ITridentSpout.Emitter<Object> {
-        private IPartitionedTridentSpout.Emitter _emitter;
+        private IPartitionedTridentSpout.Emitter<Object, ISpoutPartition, Object> _emitter;
         private TransactionalState _state;
         private Map<String, EmitterPartitionState> _partitionStates = new HashMap<>();
         private int _index;
@@ -150,7 +151,7 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout<Object> {
     }    
 
     @Override
-    public ITridentSpout.BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
+    public ITridentSpout.BatchCoordinator<Object> getCoordinator(String txStateId, Map conf, TopologyContext context) {
         return new Coordinator(conf, context);
     }
 


[3/3] storm git commit: Added STORM-1907 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1907 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6bf7890b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6bf7890b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6bf7890b

Branch: refs/heads/master
Commit: 6bf7890b8956ab982a1ff1bbfe94d8b622ac13a1
Parents: b16a750
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Jun 21 12:57:23 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Jun 21 12:57:23 2016 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6bf7890b/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f4e758e..8c34d64 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -107,6 +107,7 @@
  * STORM-1769: Added a test to check local nimbus with notifier plugin
 
 ## 1.1.0 
+ * STORM-1907: PartitionedTridentSpoutExecutor has incompatible types that cause ClassCastException
  * STORM-1136: Command line module to return kafka spout offsets lag and display in storm ui.
  * STORM-1911: IClusterMetricsConsumer should use seconds to timestamp unit
  * STORM-1849: HDFSFileTopology should use the 3rd argument as topologyName


[2/3] storm git commit: Merge branch 'master_STORM-1907' of https://github.com/hmcl/storm-apache into STORM-1907

Posted by sr...@apache.org.
Merge branch 'master_STORM-1907' of https://github.com/hmcl/storm-apache into STORM-1907


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b16a7507
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b16a7507
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b16a7507

Branch: refs/heads/master
Commit: b16a75070706f01adc571d09f7606f1311d1e7e9
Parents: 6e67ea7 aae0acb
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Jun 21 11:41:07 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Jun 21 11:41:07 2016 -0700

----------------------------------------------------------------------
 .../storm/starter/trident/TridentWordCount.java | 13 ++++++------
 .../spout/PartitionedTridentSpoutExecutor.java  | 21 ++++++++++----------
 2 files changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------