You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/02/03 22:59:24 UTC

[1/5] storm git commit: [STORM-1505] Add map and flatMap functions in trident stream

Repository: storm
Updated Branches:
  refs/heads/master 3d9481f40 -> 695f8c931


[STORM-1505] Add map and flatMap functions in trident stream

map and flatmap are common stream operations. Right now in trident this has to be
implemented via each() which also sends the input field values
in addition to the mapped field values, so the map and flatmap should make things slightly
more efficient and easy.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1832c369
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1832c369
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1832c369

Branch: refs/heads/master
Commit: 1832c369735576df61c6533ff772814b537e5d68
Parents: ecce1ce
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Jan 27 12:32:33 2016 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Wed Jan 27 16:04:59 2016 +0530

----------------------------------------------------------------------
 .../starter/trident/TridentMapExample.java      | 105 +++++++++++++++++++
 .../jvm/org/apache/storm/trident/Stream.java    |  43 +++++++-
 .../trident/operation/FlatMapFunction.java      |  36 +++++++
 .../storm/trident/operation/MapFunction.java    |  35 +++++++
 .../operation/impl/FlatMapFunctionExecutor.java |  52 +++++++++
 .../operation/impl/MapFunctionExecutor.java     |  50 +++++++++
 .../trident/planner/processor/MapProcessor.java |  87 +++++++++++++++
 7 files changed, 407 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1832c369/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
new file mode 100644
index 0000000..c493e04
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalDRPC;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.FlatMapFunction;
+import org.apache.storm.trident.operation.MapFunction;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.FilterNull;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A simple example that demonstrates the usage of {@link org.apache.storm.trident.Stream#map(MapFunction)} and
+ * {@link org.apache.storm.trident.Stream#flatMap(FlatMapFunction)} functions.
+ */
+public class TridentMapExample {
+
+    private static MapFunction toUpper = new MapFunction() {
+        @Override
+        public Values execute(Values input) {
+            return new Values(((String) input.get(0)).toUpperCase());
+        }
+    };
+
+    private static FlatMapFunction split = new FlatMapFunction() {
+        @Override
+        public Iterable<Values> execute(Values input) {
+            List<Values> valuesList = new ArrayList<>();
+            for (String word : ((String) input.get(0)).split(" ")) {
+                valuesList.add(new Values(word));
+            }
+            return valuesList;
+        }
+    };
+
+    public static StormTopology buildTopology(LocalDRPC drpc) {
+        FixedBatchSpout spout = new FixedBatchSpout(
+                new Fields("word"), 3, new Values("the cow jumped over the moon"),
+                new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
+                new Values("how many apples can you eat"), new Values("to be or not to be the person"));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16)
+                .flatMap(split)
+                .map(toUpper)
+                .groupBy(new Fields("word"))
+                .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
+                .parallelismHint(16);
+
+        topology.newDRPCStream("words", drpc)
+                .flatMap(split)
+                .groupBy(new Fields("args"))
+                .stateQuery(wordCounts, new Fields("args"), new MapGet(), new Fields("count"))
+                .each(new Fields("count"), new FilterNull())
+                .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        if (args.length == 0) {
+            LocalDRPC drpc = new LocalDRPC();
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
+            for (int i = 0; i < 100; i++) {
+                System.out.println("DRPC RESULT: " + drpc.execute("words", "CAT THE DOG JUMPED"));
+                Thread.sleep(1000);
+            }
+        } else {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1832c369/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index fb2497a..32daa33 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -21,6 +21,11 @@ import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.NullStruct;
 import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
 import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.trident.operation.FlatMapFunction;
+import org.apache.storm.trident.operation.MapFunction;
+import org.apache.storm.trident.operation.impl.FlatMapFunctionExecutor;
+import org.apache.storm.trident.operation.impl.MapFunctionExecutor;
+import org.apache.storm.trident.planner.processor.MapProcessor;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.trident.fluent.GlobalAggregationScheme;
@@ -323,7 +328,43 @@ public class Stream implements IAggregatableStream {
     
     public Stream each(Fields inputFields, Filter filter) {
         return each(inputFields, new FilterExecutor(filter), new Fields());
-    }    
+    }
+
+    /**
+     * Returns a stream consisting of the result of applying the given mapping function to the values of this stream.
+     *
+     * @param function a mapping function to be applied to each value in this stream.
+     * @return the new stream
+     */
+    public Stream map(MapFunction function) {
+        projectionValidation(getOutputFields());
+        return _topology.addSourcedNode(this,
+                                        new ProcessorNode(
+                                                _topology.getUniqueStreamId(),
+                                                _name,
+                                                getOutputFields(),
+                                                getOutputFields(),
+                                                new MapProcessor(getOutputFields(), new MapFunctionExecutor(function))));
+    }
+
+    /**
+     * Returns a stream consisting of the results of replacing each value of this stream with the contents
+     * produced by applying the provided mapping function to each value. This has the effect of applying
+     * a one-to-many transformation to the values of the stream, and then flattening the resulting elements into a new stream.
+     *
+     * @param function a mapping function to be applied to each value in this stream which produces new values.
+     * @return the new stream
+     */
+    public Stream flatMap(FlatMapFunction function) {
+        projectionValidation(getOutputFields());
+        return _topology.addSourcedNode(this,
+                                        new ProcessorNode(
+                                                _topology.getUniqueStreamId(),
+                                                _name,
+                                                getOutputFields(),
+                                                getOutputFields(),
+                                                new MapProcessor(getOutputFields(), new FlatMapFunctionExecutor(function))));
+    }
     
     public ChainedAggregatorDeclarer chainedAgg() {
         return new ChainedAggregatorDeclarer(this, new BatchGlobalAggScheme());

http://git-wip-us.apache.org/repos/asf/storm/blob/1832c369/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java b/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
new file mode 100644
index 0000000..b19f811
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation;
+
+import org.apache.storm.tuple.Values;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A one to many transformation function
+ */
+public interface FlatMapFunction extends Serializable {
+    /**
+     * Invoked by the framework for each value in a stream.
+     *
+     * @param input the input value
+     * @return an iterable over the resultant values
+     */
+    Iterable<Values> execute(Values input);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1832c369/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java b/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
new file mode 100644
index 0000000..8611afd
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation;
+
+import org.apache.storm.tuple.Values;
+
+import java.io.Serializable;
+
+/**
+ * A one-one transformation function
+ */
+public interface MapFunction extends Serializable {
+    /**
+     * Invoked by the framework for each value in a stream.
+     *
+     * @param input the input value
+     * @return the transformed value
+     */
+    Values execute(Values input);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1832c369/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
new file mode 100644
index 0000000..8ab2a17
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation.impl;
+
+import org.apache.storm.trident.operation.FlatMapFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+public class FlatMapFunctionExecutor implements Function {
+    private final FlatMapFunction function;
+
+    public FlatMapFunctionExecutor(FlatMapFunction function) {
+        this.function = function;
+    }
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+        for (Values values : function.execute(new Values(tuple.getValues().toArray()))) {
+            collector.emit(values);
+        }
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+        // NOOP
+    }
+
+    @Override
+    public void cleanup() {
+        // NOOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1832c369/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
new file mode 100644
index 0000000..a751570
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation.impl;
+
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.MapFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+public class MapFunctionExecutor implements Function {
+    private final MapFunction function;
+
+    public MapFunctionExecutor(MapFunction function) {
+        this.function = function;
+    }
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+        collector.emit(function.execute(new Values(tuple.getValues().toArray())));
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+        // NOOP
+    }
+
+    @Override
+    public void cleanup() {
+        // NOOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1832c369/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
new file mode 100644
index 0000000..2fd0a4c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.planner.processor;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.FlatMapFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.MapFunction;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.planner.ProcessorContext;
+import org.apache.storm.trident.planner.TridentProcessor;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.tuple.TridentTupleView;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Processor for executing {@link org.apache.storm.trident.Stream#map(MapFunction)} and
+ * {@link org.apache.storm.trident.Stream#flatMap(FlatMapFunction)} functions.
+ */
+public class MapProcessor implements TridentProcessor {
+    Function _function;
+    TridentContext _context;
+    FreshCollector _collector;
+    Fields _inputFields;
+    TridentTupleView.ProjectionFactory _projection;
+
+    public MapProcessor(Fields inputFields, Function function) {
+        _function = function;
+        _inputFields = inputFields;
+    }
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
+        List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
+        if(parents.size()!=1) {
+            throw new RuntimeException("Each operation can only have one parent");
+        }
+        _context = tridentContext;
+        _collector = new FreshCollector(tridentContext);
+        _projection = new TridentTupleView.ProjectionFactory(parents.get(0), _inputFields);
+        _function.prepare(conf, new TridentOperationContext(context, _projection));
+    }
+
+    @Override
+    public void cleanup() {
+        _function.cleanup();
+    }
+
+    @Override
+    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
+        _collector.setContext(processorContext);
+        _function.execute(_projection.create(tuple), _collector);
+    }
+
+    @Override
+    public void startBatch(ProcessorContext processorContext) {
+        // NOOP
+    }
+
+    @Override
+    public void finishBatch(ProcessorContext processorContext) {
+        // NOOP
+    }
+
+    @Override
+    public TridentTuple.Factory getOutputFactory() {
+        return _collector.getOutputFactory();
+    }
+}


[5/5] storm git commit: add STORM-1505 to changelog

Posted by pt...@apache.org.
add STORM-1505 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/695f8c93
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/695f8c93
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/695f8c93

Branch: refs/heads/master
Commit: 695f8c931e85181d7b969397c831ae5c8adc183a
Parents: f1f4a5f
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Feb 3 16:58:03 2016 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Feb 3 16:58:03 2016 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/695f8c93/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index cece4a0..51b5dea 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,7 @@
  * STORM-1504: Add Serializer and instruction for AvroGenericRecordBolt
 
 ## 1.0.0
+ * STORM-1505: Add map, flatMap and filter functions in trident stream
  * STORM-1518: Backport of STORM-1504
  * STORM-1510: Fix broken nimbus log link
  * STORM-1503: Worker should not crash on failure to send heartbeats to Pacemaker/ZK


[4/5] storm git commit: Merge branch 'STORM-1505' of github.com:arunmahadevan/storm

Posted by pt...@apache.org.
Merge branch 'STORM-1505' of github.com:arunmahadevan/storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f1f4a5fc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f1f4a5fc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f1f4a5fc

Branch: refs/heads/master
Commit: f1f4a5fc813350f31c6f921be9c78d8dc005ed49
Parents: 3d9481f b380414
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Feb 3 16:57:18 2016 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Feb 3 16:57:18 2016 -0500

----------------------------------------------------------------------
 .../starter/trident/TridentMapExample.java      | 116 +++++++++++++++++++
 .../jvm/org/apache/storm/trident/Stream.java    |  66 ++++++++++-
 .../trident/operation/FlatMapFunction.java      |  37 ++++++
 .../storm/trident/operation/MapFunction.java    |  36 ++++++
 .../operation/impl/FlatMapFunctionExecutor.java |  43 +++++++
 .../operation/impl/MapFunctionExecutor.java     |  41 +++++++
 .../trident/planner/processor/MapProcessor.java |  87 ++++++++++++++
 7 files changed, 424 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[3/5] storm git commit: Added filter api

Posted by pt...@apache.org.
Added filter api

filter is a wrapper over each to easily filter out tuples flowing through the pipeline.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b3804148
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b3804148
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b3804148

Branch: refs/heads/master
Commit: b38041481e3c1d13db40085eb0bed3e8e944bbd1
Parents: 77325fd
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Tue Feb 2 16:28:59 2016 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Feb 2 20:20:29 2016 +0530

----------------------------------------------------------------------
 .../starter/trident/TridentMapExample.java      | 12 +++++++++-
 .../jvm/org/apache/storm/trident/Stream.java    | 23 +++++++++++++++++++-
 2 files changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b3804148/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
index c19e9ee..95b52cc 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
@@ -24,6 +24,8 @@ import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.operation.Filter;
 import org.apache.storm.trident.operation.FlatMapFunction;
 import org.apache.storm.trident.operation.MapFunction;
 import org.apache.storm.trident.operation.builtin.Count;
@@ -63,6 +65,13 @@ public class TridentMapExample {
         }
     };
 
+    private static Filter theFilter = new BaseFilter() {
+        @Override
+        public boolean isKeep(TridentTuple tuple) {
+            return tuple.getString(0).equals("THE");
+        }
+    };
+
     public static StormTopology buildTopology(LocalDRPC drpc) {
         FixedBatchSpout spout = new FixedBatchSpout(
                 new Fields("word"), 3, new Values("the cow jumped over the moon"),
@@ -74,6 +83,7 @@ public class TridentMapExample {
         TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16)
                 .flatMap(split)
                 .map(toUpper)
+                .filter(theFilter)
                 .groupBy(new Fields("word"))
                 .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
                 .parallelismHint(16);
@@ -82,7 +92,7 @@ public class TridentMapExample {
                 .flatMap(split)
                 .groupBy(new Fields("args"))
                 .stateQuery(wordCounts, new Fields("args"), new MapGet(), new Fields("count"))
-                .each(new Fields("count"), new FilterNull())
+                .filter(new FilterNull())
                 .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
         return topology.build();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/b3804148/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 32daa33..dffc984 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -331,6 +331,27 @@ public class Stream implements IAggregatableStream {
     }
 
     /**
+     * Returns a stream consisting of the elements of this stream that match the given filter.
+     *
+     * @param filter the filter to apply to each trident tuple to determine if it should be included.
+     * @return the new stream
+     */
+    public Stream filter(Filter filter) {
+        return each(getOutputFields(), filter);
+    }
+
+    /**
+     * Returns a stream consisting of the elements of this stream that match the given filter.
+     *
+     * @param inputFields the fields of the input trident tuple to be selected.
+     * @param filter      the filter to apply to each trident tuple to determine if it should be included.
+     * @return the new stream
+     */
+    public Stream filter(Fields inputFields, Filter filter) {
+        return each(inputFields, filter);
+    }
+
+    /**
      * Returns a stream consisting of the result of applying the given mapping function to the values of this stream.
      *
      * @param function a mapping function to be applied to each value in this stream.
@@ -365,7 +386,7 @@ public class Stream implements IAggregatableStream {
                                                 getOutputFields(),
                                                 new MapProcessor(getOutputFields(), new FlatMapFunctionExecutor(function))));
     }
-    
+
     public ChainedAggregatorDeclarer chainedAgg() {
         return new ChainedAggregatorDeclarer(this, new BatchGlobalAggScheme());
     }


[2/5] storm git commit: Addressed review comments

Posted by pt...@apache.org.
Addressed review comments

Changed input to the map and flatMap functions from Values to TridentTuple to
make it more easy to use.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/77325fdb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/77325fdb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/77325fdb

Branch: refs/heads/master
Commit: 77325fdbcc506fa9c75a7b2e39f5b24fcca1e58b
Parents: 1832c36
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Thu Jan 28 12:46:31 2016 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Thu Jan 28 14:16:19 2016 +0530

----------------------------------------------------------------------
 .../storm/starter/trident/TridentMapExample.java     |  9 +++++----
 .../storm/trident/operation/FlatMapFunction.java     |  7 ++++---
 .../apache/storm/trident/operation/MapFunction.java  |  9 +++++----
 .../operation/impl/FlatMapFunctionExecutor.java      | 15 +++------------
 .../trident/operation/impl/MapFunctionExecutor.java  | 15 +++------------
 .../trident/planner/processor/MapProcessor.java      |  2 +-
 6 files changed, 21 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
index c493e04..c19e9ee 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
@@ -32,6 +32,7 @@ import org.apache.storm.trident.operation.builtin.MapGet;
 import org.apache.storm.trident.operation.builtin.Sum;
 import org.apache.storm.trident.testing.FixedBatchSpout;
 import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
@@ -46,16 +47,16 @@ public class TridentMapExample {
 
     private static MapFunction toUpper = new MapFunction() {
         @Override
-        public Values execute(Values input) {
-            return new Values(((String) input.get(0)).toUpperCase());
+        public Values execute(TridentTuple input) {
+            return new Values(input.getStringByField("word").toUpperCase());
         }
     };
 
     private static FlatMapFunction split = new FlatMapFunction() {
         @Override
-        public Iterable<Values> execute(Values input) {
+        public Iterable<Values> execute(TridentTuple input) {
             List<Values> valuesList = new ArrayList<>();
-            for (String word : ((String) input.get(0)).split(" ")) {
+            for (String word : input.getString(0).split(" ")) {
                 valuesList.add(new Values(word));
             }
             return valuesList;

http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java b/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
index b19f811..df9f2c6 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/FlatMapFunction.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.trident.operation;
 
+import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Values;
 
 import java.io.Serializable;
@@ -27,10 +28,10 @@ import java.util.List;
  */
 public interface FlatMapFunction extends Serializable {
     /**
-     * Invoked by the framework for each value in a stream.
+     * Invoked by the framework for each trident tuple in a stream.
      *
-     * @param input the input value
+     * @param input the input trident tuple
      * @return an iterable over the resultant values
      */
-    Iterable<Values> execute(Values input);
+    Iterable<Values> execute(TridentTuple input);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java b/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
index 8611afd..0a2f2ec 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/MapFunction.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.trident.operation;
 
+import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Values;
 
 import java.io.Serializable;
@@ -26,10 +27,10 @@ import java.io.Serializable;
  */
 public interface MapFunction extends Serializable {
     /**
-     * Invoked by the framework for each value in a stream.
+     * Invoked by the framework for each trident tuple in a stream.
      *
-     * @param input the input value
-     * @return the transformed value
+     * @param input the input trident tuple
+     * @return the transformed values
      */
-    Values execute(Values input);
+    Values execute(TridentTuple input);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
index 8ab2a17..b94f74e 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/FlatMapFunctionExecutor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.trident.operation.impl;
 
+import org.apache.storm.trident.operation.BaseOperation;
 import org.apache.storm.trident.operation.FlatMapFunction;
 import org.apache.storm.trident.operation.Function;
 import org.apache.storm.trident.operation.TridentCollector;
@@ -26,7 +27,7 @@ import org.apache.storm.tuple.Values;
 
 import java.util.Map;
 
-public class FlatMapFunctionExecutor implements Function {
+public class FlatMapFunctionExecutor extends BaseOperation implements Function {
     private final FlatMapFunction function;
 
     public FlatMapFunctionExecutor(FlatMapFunction function) {
@@ -35,18 +36,8 @@ public class FlatMapFunctionExecutor implements Function {
 
     @Override
     public void execute(TridentTuple tuple, TridentCollector collector) {
-        for (Values values : function.execute(new Values(tuple.getValues().toArray()))) {
+        for (Values values : function.execute(tuple)) {
             collector.emit(values);
         }
     }
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-        // NOOP
-    }
-
-    @Override
-    public void cleanup() {
-        // NOOP
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
index a751570..0eb1f20 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/MapFunctionExecutor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.trident.operation.impl;
 
+import org.apache.storm.trident.operation.BaseOperation;
 import org.apache.storm.trident.operation.Function;
 import org.apache.storm.trident.operation.MapFunction;
 import org.apache.storm.trident.operation.TridentCollector;
@@ -26,7 +27,7 @@ import org.apache.storm.tuple.Values;
 
 import java.util.Map;
 
-public class MapFunctionExecutor implements Function {
+public class MapFunctionExecutor extends BaseOperation implements Function {
     private final MapFunction function;
 
     public MapFunctionExecutor(MapFunction function) {
@@ -35,16 +36,6 @@ public class MapFunctionExecutor implements Function {
 
     @Override
     public void execute(TridentTuple tuple, TridentCollector collector) {
-        collector.emit(function.execute(new Values(tuple.getValues().toArray())));
-    }
-
-    @Override
-    public void prepare(Map conf, TridentOperationContext context) {
-        // NOOP
-    }
-
-    @Override
-    public void cleanup() {
-        // NOOP
+        collector.emit(function.execute(tuple));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/77325fdb/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
index 2fd0a4c..f7151c9 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java
@@ -51,7 +51,7 @@ public class MapProcessor implements TridentProcessor {
     public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
         List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
         if(parents.size()!=1) {
-            throw new RuntimeException("Each operation can only have one parent");
+            throw new RuntimeException("Map operation can only have one parent");
         }
         _context = tridentContext;
         _collector = new FreshCollector(tridentContext);