You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/02/03 22:59:36 UTC
[3/6] storm git commit: Added filter api
Added filter api
filter is a wrapper over each to easily filter out tuples flowing through the pipeline.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b3804148
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b3804148
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b3804148
Branch: refs/heads/1.x-branch
Commit: b38041481e3c1d13db40085eb0bed3e8e944bbd1
Parents: 77325fd
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Tue Feb 2 16:28:59 2016 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Feb 2 20:20:29 2016 +0530
----------------------------------------------------------------------
.../starter/trident/TridentMapExample.java | 12 +++++++++-
.../jvm/org/apache/storm/trident/Stream.java | 23 +++++++++++++++++++-
2 files changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b3804148/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
index c19e9ee..95b52cc 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
@@ -24,6 +24,8 @@ import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.operation.Filter;
import org.apache.storm.trident.operation.FlatMapFunction;
import org.apache.storm.trident.operation.MapFunction;
import org.apache.storm.trident.operation.builtin.Count;
@@ -63,6 +65,13 @@ public class TridentMapExample {
}
};
+ private static Filter theFilter = new BaseFilter() {
+ @Override
+ public boolean isKeep(TridentTuple tuple) {
+ return tuple.getString(0).equals("THE");
+ }
+ };
+
public static StormTopology buildTopology(LocalDRPC drpc) {
FixedBatchSpout spout = new FixedBatchSpout(
new Fields("word"), 3, new Values("the cow jumped over the moon"),
@@ -74,6 +83,7 @@ public class TridentMapExample {
TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16)
.flatMap(split)
.map(toUpper)
+ .filter(theFilter)
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.parallelismHint(16);
@@ -82,7 +92,7 @@ public class TridentMapExample {
.flatMap(split)
.groupBy(new Fields("args"))
.stateQuery(wordCounts, new Fields("args"), new MapGet(), new Fields("count"))
- .each(new Fields("count"), new FilterNull())
+ .filter(new FilterNull())
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));
return topology.build();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b3804148/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 32daa33..dffc984 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -331,6 +331,27 @@ public class Stream implements IAggregatableStream {
}
/**
+ * Returns a stream consisting of the elements of this stream that match the given filter.
+ *
+ * @param filter the filter to apply to each trident tuple to determine if it should be included.
+ * @return the new stream
+ */
+ public Stream filter(Filter filter) {
+ return each(getOutputFields(), filter);
+ }
+
+ /**
+ * Returns a stream consisting of the elements of this stream that match the given filter.
+ *
+ * @param inputFields the fields of the input trident tuple to be selected.
+ * @param filter the filter to apply to each trident tuple to determine if it should be included.
+ * @return the new stream
+ */
+ public Stream filter(Fields inputFields, Filter filter) {
+ return each(inputFields, filter);
+ }
+
+ /**
* Returns a stream consisting of the result of applying the given mapping function to the values of this stream.
*
* @param function a mapping function to be applied to each value in this stream.
@@ -365,7 +386,7 @@ public class Stream implements IAggregatableStream {
getOutputFields(),
new MapProcessor(getOutputFields(), new FlatMapFunctionExecutor(function))));
}
-
+
public ChainedAggregatorDeclarer chainedAgg() {
return new ChainedAggregatorDeclarer(this, new BatchGlobalAggScheme());
}