You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/02/03 22:59:35 UTC
[2/6] storm git commit: Addressed review comments
Addressed review comments
Changed input to the map and flatMap functions from Values to TridentTuple to
make it more easy to use.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/77325fdb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/77325fdb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/77325fdb
Branch: refs/heads/1.x-branch
Commit: 77325fdbcc506fa9c75a7b2e39f5b24fcca1e58b
Parents: 1832c36
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Thu Jan 28 12:46:31 2016 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Thu Jan 28 14:16:19 2016 +0530
----------------------------------------------------------------------
.../storm/starter/trident/TridentMapExample.java | 9 +++++----
.../storm/trident/operation/FlatMapFunction.java | 7 ++++---
.../apache/storm/trident/operation/MapFunction.java | 9 +++++----
.../operation/impl/FlatMapFunctionExecutor.java | 15 +++------------
.../trident/operation/impl/MapFunctionExecutor.java | 15 +++------------
.../trident/planner/processor/MapProcessor.java | 2 +-
6 files changed, 21 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
index c493e04..c19e9ee 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
@@ -32,6 +32,7 @@ import org.apache.storm.trident.operation.builtin.MapGet;
import org.apache.storm.trident.operation.builtin.Sum;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
@@ -46,16 +47,16 @@ public class TridentMapExample {
private static MapFunction toUpper = new MapFunction() {
@Override
- public Values execute(Values input) {
- return new Values(((String) input.get(0)).toUpperCase());
+ public Values execute(TridentTuple input) {
+ return new Values(input.getStringByField("word").toUpperCase());
}
};
private static FlatMapFunction split = new FlatMapFunction() {
@Override
- public Iterable<Values> execute(Values input) {
+ public Iterable<Values> execute(TridentTuple input) {
List<Values> valuesList = new ArrayList<>();
- for (String word : ((String) input.get(0)).split(" ")) {
+ for (String word : input.getString(0).split(" ")) {
valuesList.add(new Values(word));
}
return valuesList;
http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java b/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
index b19f811..df9f2c6 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.trident.operation;
+import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;
import java.io.Serializable;
@@ -27,10 +28,10 @@ import java.util.List;
*/
public interface FlatMapFunction extends Serializable {
/**
- * Invoked by the framework for each value in a stream.
+ * Invoked by the framework for each trident tuple in a stream.
*
- * @param input the input value
+ * @param input the input trident tuple
* @return an iterable over the resultant values
*/
- Iterable<Values> execute(Values input);
+ Iterable<Values> execute(TridentTuple input);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java b/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
index 8611afd..0a2f2ec 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.trident.operation;
+import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;
import java.io.Serializable;
@@ -26,10 +27,10 @@ import java.io.Serializable;
*/
public interface MapFunction extends Serializable {
/**
- * Invoked by the framework for each value in a stream.
+ * Invoked by the framework for each trident tuple in a stream.
*
- * @param input the input value
- * @return the transformed value
+ * @param input the input trident tuple
+ * @return the transformed values
*/
- Values execute(Values input);
+ Values execute(TridentTuple input);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
index 8ab2a17..b94f74e 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.trident.operation.impl;
+import org.apache.storm.trident.operation.BaseOperation;
import org.apache.storm.trident.operation.FlatMapFunction;
import org.apache.storm.trident.operation.Function;
import org.apache.storm.trident.operation.TridentCollector;
@@ -26,7 +27,7 @@ import org.apache.storm.tuple.Values;
import java.util.Map;
-public class FlatMapFunctionExecutor implements Function {
+public class FlatMapFunctionExecutor extends BaseOperation implements Function {
private final FlatMapFunction function;
public FlatMapFunctionExecutor(FlatMapFunction function) {
@@ -35,18 +36,8 @@ public class FlatMapFunctionExecutor implements Function {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
- for (Values values : function.execute(new Values(tuple.getValues().toArray()))) {
+ for (Values values : function.execute(tuple)) {
collector.emit(values);
}
}
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- // NOOP
- }
-
- @Override
- public void cleanup() {
- // NOOP
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
index a751570..0eb1f20 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.trident.operation.impl;
+import org.apache.storm.trident.operation.BaseOperation;
import org.apache.storm.trident.operation.Function;
import org.apache.storm.trident.operation.MapFunction;
import org.apache.storm.trident.operation.TridentCollector;
@@ -26,7 +27,7 @@ import org.apache.storm.tuple.Values;
import java.util.Map;
-public class MapFunctionExecutor implements Function {
+public class MapFunctionExecutor extends BaseOperation implements Function {
private final MapFunction function;
public MapFunctionExecutor(MapFunction function) {
@@ -35,16 +36,6 @@ public class MapFunctionExecutor implements Function {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
- collector.emit(function.execute(new Values(tuple.getValues().toArray())));
- }
-
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- // NOOP
- }
-
- @Override
- public void cleanup() {
- // NOOP
+ collector.emit(function.execute(tuple));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
index 2fd0a4c..f7151c9 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
@@ -51,7 +51,7 @@ public class MapProcessor implements TridentProcessor {
public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
if(parents.size()!=1) {
- throw new RuntimeException("Each operation can only have one parent");
+ throw new RuntimeException("Map operation can only have one parent");
}
_context = tridentContext;
_collector = new FreshCollector(tridentContext);