You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/02/03 22:59:36 UTC

[3/6] storm git commit: Added filter api

Added filter api

filter is a wrapper over each to easily filter out tuples flowing through the pipeline.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b3804148
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b3804148
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b3804148

Branch: refs/heads/1.x-branch
Commit: b38041481e3c1d13db40085eb0bed3e8e944bbd1
Parents: 77325fd
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Tue Feb 2 16:28:59 2016 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Feb 2 20:20:29 2016 +0530

----------------------------------------------------------------------
 .../starter/trident/TridentMapExample.java      | 12 +++++++++-
 .../jvm/org/apache/storm/trident/Stream.java    | 23 +++++++++++++++++++-
 2 files changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b3804148/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
index c19e9ee..95b52cc 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
@@ -24,6 +24,8 @@ import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.operation.Filter;
 import org.apache.storm.trident.operation.FlatMapFunction;
 import org.apache.storm.trident.operation.MapFunction;
 import org.apache.storm.trident.operation.builtin.Count;
@@ -63,6 +65,13 @@ public class TridentMapExample {
         }
     };
 
+    private static Filter theFilter = new BaseFilter() {
+        @Override
+        public boolean isKeep(TridentTuple tuple) {
+            return tuple.getString(0).equals("THE");
+        }
+    };
+
     public static StormTopology buildTopology(LocalDRPC drpc) {
         FixedBatchSpout spout = new FixedBatchSpout(
                 new Fields("word"), 3, new Values("the cow jumped over the moon"),
@@ -74,6 +83,7 @@ public class TridentMapExample {
         TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16)
                 .flatMap(split)
                 .map(toUpper)
+                .filter(theFilter)
                 .groupBy(new Fields("word"))
                 .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
                 .parallelismHint(16);
@@ -82,7 +92,7 @@ public class TridentMapExample {
                 .flatMap(split)
                 .groupBy(new Fields("args"))
                 .stateQuery(wordCounts, new Fields("args"), new MapGet(), new Fields("count"))
-                .each(new Fields("count"), new FilterNull())
+                .filter(new FilterNull())
                 .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
         return topology.build();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/b3804148/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 32daa33..dffc984 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -331,6 +331,27 @@ public class Stream implements IAggregatableStream {
     }
 
     /**
+     * Returns a stream consisting of the elements of this stream that match the given filter.
+     *
+     * @param filter the filter to apply to each trident tuple to determine if it should be included.
+     * @return the new stream
+     */
+    public Stream filter(Filter filter) {
+        return each(getOutputFields(), filter);
+    }
+
+    /**
+     * Returns a stream consisting of the elements of this stream that match the given filter.
+     *
+     * @param inputFields the fields of the input trident tuple to be selected.
+     * @param filter      the filter to apply to each trident tuple to determine if it should be included.
+     * @return the new stream
+     */
+    public Stream filter(Fields inputFields, Filter filter) {
+        return each(inputFields, filter);
+    }
+
+    /**
      * Returns a stream consisting of the result of applying the given mapping function to the values of this stream.
      *
      * @param function a mapping function to be applied to each value in this stream.
@@ -365,7 +386,7 @@ public class Stream implements IAggregatableStream {
                                                 getOutputFields(),
                                                 new MapProcessor(getOutputFields(), new FlatMapFunctionExecutor(function))));
     }
-    
+
     public ChainedAggregatorDeclarer chainedAgg() {
         return new ChainedAggregatorDeclarer(this, new BatchGlobalAggScheme());
     }