You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/02/03 22:59:35 UTC

[2/6] storm git commit: Addressed review comments

Addressed review comments

Changed input to the map and flatMap functions from Values to TridentTuple to
make it more easy to use.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/77325fdb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/77325fdb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/77325fdb

Branch: refs/heads/1.x-branch
Commit: 77325fdbcc506fa9c75a7b2e39f5b24fcca1e58b
Parents: 1832c36
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Thu Jan 28 12:46:31 2016 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Thu Jan 28 14:16:19 2016 +0530

----------------------------------------------------------------------
 .../storm/starter/trident/TridentMapExample.java     |  9 +++++----
 .../storm/trident/operation/FlatMapFunction.java     |  7 ++++---
 .../apache/storm/trident/operation/MapFunction.java  |  9 +++++----
 .../operation/impl/FlatMapFunctionExecutor.java      | 15 +++------------
 .../trident/operation/impl/MapFunctionExecutor.java  | 15 +++------------
 .../trident/planner/processor/MapProcessor.java      |  2 +-
 6 files changed, 21 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
index c493e04..c19e9ee 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
@@ -32,6 +32,7 @@ import org.apache.storm.trident.operation.builtin.MapGet;
 import org.apache.storm.trident.operation.builtin.Sum;
 import org.apache.storm.trident.testing.FixedBatchSpout;
 import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
@@ -46,16 +47,16 @@ public class TridentMapExample {
 
     private static MapFunction toUpper = new MapFunction() {
         @Override
-        public Values execute(Values input) {
-            return new Values(((String) input.get(0)).toUpperCase());
+        public Values execute(TridentTuple input) {
+            return new Values(input.getStringByField("word").toUpperCase());
         }
     };
 
     private static FlatMapFunction split = new FlatMapFunction() {
         @Override
-        public Iterable<Values> execute(Values input) {
+        public Iterable<Values> execute(TridentTuple input) {
             List<Values> valuesList = new ArrayList<>();
-            for (String word : ((String) input.get(0)).split(" ")) {
+            for (String word : input.getString(0).split(" ")) {
                 valuesList.add(new Values(word));
             }
             return valuesList;

http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java b/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
index b19f811..df9f2c6 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.trident.operation;
 
+import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Values;
 
 import java.io.Serializable;
@@ -27,10 +28,10 @@ import java.util.List;
  */
 public interface FlatMapFunction extends Serializable {
     /**
-     * Invoked by the framework for each value in a stream.
+     * Invoked by the framework for each trident tuple in a stream.
      *
-     * @param input the input value
+     * @param input the input trident tuple
      * @return an iterable over the resultant values
      */
-    Iterable<Values> execute(Values input);
+    Iterable<Values> execute(TridentTuple input);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java b/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
index 8611afd..0a2f2ec 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.trident.operation;
 
+import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Values;
 
 import java.io.Serializable;
@@ -26,10 +27,10 @@ import java.io.Serializable;
  */
 public interface MapFunction extends Serializable {
     /**
-     * Invoked by the framework for each value in a stream.
+     * Invoked by the framework for each trident tuple in a stream.
      *
-     * @param input the input value
-     * @return the transformed value
+     * @param input the input trident tuple
+     * @return the transformed values
      */
-    Values execute(Values input);
+    Values execute(TridentTuple input);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
index 8ab2a17..b94f74e 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.trident.operation.impl;
 
+import org.apache.storm.trident.operation.BaseOperation;
 import org.apache.storm.trident.operation.FlatMapFunction;
 import org.apache.storm.trident.operation.Function;
 import org.apache.storm.trident.operation.TridentCollector;
@@ -26,7 +27,7 @@ import org.apache.storm.tuple.Values;
 
 import java.util.Map;
 
-public class FlatMapFunctionExecutor implements Function {
+public class FlatMapFunctionExecutor extends BaseOperation implements Function {
     private final FlatMapFunction function;
 
     public FlatMapFunctionExecutor(FlatMapFunction function) {
@@ -35,18 +36,8 @@ public class FlatMapFunctionExecutor implements Function {
 
     @Override
     public void execute(TridentTuple tuple, TridentCollector collector) {
-        for (Values values : function.execute(new Values(tuple.getValues().toArray()))) {
+        for (Values values : function.execute(tuple)) {
             collector.emit(values);
         }
     }
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-        // NOOP
-    }
-
-    @Override
-    public void cleanup() {
-        // NOOP
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
index a751570..0eb1f20 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.trident.operation.impl;
 
+import org.apache.storm.trident.operation.BaseOperation;
 import org.apache.storm.trident.operation.Function;
 import org.apache.storm.trident.operation.MapFunction;
 import org.apache.storm.trident.operation.TridentCollector;
@@ -26,7 +27,7 @@ import org.apache.storm.tuple.Values;
 
 import java.util.Map;
 
-public class MapFunctionExecutor implements Function {
+public class MapFunctionExecutor extends BaseOperation implements Function {
     private final MapFunction function;
 
     public MapFunctionExecutor(MapFunction function) {
@@ -35,16 +36,6 @@ public class MapFunctionExecutor implements Function {
 
     @Override
     public void execute(TridentTuple tuple, TridentCollector collector) {
-        collector.emit(function.execute(new Values(tuple.getValues().toArray())));
-    }
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-        // NOOP
-    }
-
-    @Override
-    public void cleanup() {
-        // NOOP
+        collector.emit(function.execute(tuple));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
index 2fd0a4c..f7151c9 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
@@ -51,7 +51,7 @@ public class MapProcessor implements TridentProcessor {
     public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
         List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
         if(parents.size()!=1) {
-            throw new RuntimeException("Each operation can only have one parent");
+            throw new RuntimeException("Map operation can only have one parent");
         }
         _context = tridentContext;
         _collector = new FreshCollector(tridentContext);