You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/02/05 13:55:24 UTC
[1/3] storm git commit: [STORM-1517] add peek api in trident stream
Repository: storm
Updated Branches:
refs/heads/1.x-branch 7090be454 -> ef7581b19
[STORM-1517] add peek api in trident stream
Similar to the Java 8 peek, the peek api can be used to examine trident tuples at
some point in the stream pipeline or execute some custom actions.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/96bde69a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/96bde69a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/96bde69a
Branch: refs/heads/1.x-branch
Commit: 96bde69a0e0790546427fd1c4a29ae5e4e4af7ae
Parents: 7090be4
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Tue Feb 2 16:42:11 2016 +0530
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 5 21:45:18 2016 +0900
----------------------------------------------------------------------
.../starter/trident/TridentMapExample.java | 7 ++++
.../jvm/org/apache/storm/trident/Stream.java | 21 +++++++++++
.../storm/trident/operation/Consumer.java | 35 ++++++++++++++++++
.../operation/impl/ConsumerExecutor.java | 38 ++++++++++++++++++++
4 files changed, 101 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/96bde69a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
index 95b52cc..fbb9127 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
@@ -25,6 +25,7 @@ import org.apache.storm.generated.StormTopology;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.operation.Consumer;
import org.apache.storm.trident.operation.Filter;
import org.apache.storm.trident.operation.FlatMapFunction;
import org.apache.storm.trident.operation.MapFunction;
@@ -84,6 +85,12 @@ public class TridentMapExample {
.flatMap(split)
.map(toUpper)
.filter(theFilter)
+ .peek(new Consumer() {
+ @Override
+ public void accept(TridentTuple input) {
+ System.out.println(input.getString(0));
+ }
+ })
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.parallelismHint(16);
http://git-wip-us.apache.org/repos/asf/storm/blob/96bde69a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index dffc984..7c6d93f 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -21,8 +21,10 @@ import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.NullStruct;
import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.trident.operation.Consumer;
import org.apache.storm.trident.operation.FlatMapFunction;
import org.apache.storm.trident.operation.MapFunction;
+import org.apache.storm.trident.operation.impl.ConsumerExecutor;
import org.apache.storm.trident.operation.impl.FlatMapFunctionExecutor;
import org.apache.storm.trident.operation.impl.MapFunctionExecutor;
import org.apache.storm.trident.planner.processor.MapProcessor;
@@ -387,6 +389,25 @@ public class Stream implements IAggregatableStream {
new MapProcessor(getOutputFields(), new FlatMapFunctionExecutor(function))));
}
+ /**
+ * Returns a stream consisting of the trident tuples of this stream, additionally performing the provided action on
+ * each trident tuple as they are consumed from the resulting stream. This is mostly useful for debugging
+ * to see the tuples as they flow past a certain point in a pipeline.
+ *
+ * @param action the action to perform on the trident tuple as they are consumed from the stream
+ * @return the new stream
+ */
+ public Stream peek(Consumer action) {
+ projectionValidation(getOutputFields());
+ return _topology.addSourcedNode(this,
+ new ProcessorNode(
+ _topology.getUniqueStreamId(),
+ _name,
+ getOutputFields(),
+ getOutputFields(),
+ new MapProcessor(getOutputFields(), new ConsumerExecutor(action))));
+ }
+
public ChainedAggregatorDeclarer chainedAgg() {
return new ChainedAggregatorDeclarer(this, new BatchGlobalAggScheme());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/96bde69a/storm-core/src/jvm/org/apache/storm/trident/operation/Consumer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/Consumer.java b/storm-core/src/jvm/org/apache/storm/trident/operation/Consumer.java
new file mode 100644
index 0000000..dd13b48
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/Consumer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+/**
+ * Represents an operation that accepts a single input argument and returns no result.
+ * This is similar to the Consumer interface in Java 8.
+ */
+public interface Consumer extends Serializable {
+ /**
+ * Performs the operation on the input trident tuple.
+ *
+ * @param input the input trident tuple
+ */
+ void accept(TridentTuple input);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/96bde69a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/ConsumerExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/ConsumerExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/ConsumerExecutor.java
new file mode 100644
index 0000000..c08a8f7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/ConsumerExecutor.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation.impl;
+
+import org.apache.storm.trident.operation.BaseOperation;
+import org.apache.storm.trident.operation.Consumer;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class ConsumerExecutor extends BaseOperation implements Function {
+ private final Consumer consumer;
+
+ public ConsumerExecutor(Consumer consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ consumer.accept(tuple);
+ collector.emit(tuple);
+ }
+}
[2/3] storm git commit: Merge branch 'STORM-1517-1.x' into 1.x-branch
Posted by ka...@apache.org.
Merge branch 'STORM-1517-1.x' into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dba93672
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dba93672
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dba93672
Branch: refs/heads/1.x-branch
Commit: dba93672b6d05670785a61d343a4f99c79ee5846
Parents: 7090be4 96bde69
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Feb 5 21:53:49 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 5 21:53:49 2016 +0900
----------------------------------------------------------------------
.../starter/trident/TridentMapExample.java | 7 ++++
.../jvm/org/apache/storm/trident/Stream.java | 21 +++++++++++
.../storm/trident/operation/Consumer.java | 35 ++++++++++++++++++
.../operation/impl/ConsumerExecutor.java | 38 ++++++++++++++++++++
4 files changed, 101 insertions(+)
----------------------------------------------------------------------
[3/3] storm git commit: add STORM-1517 to CHANGELOG.md
Posted by ka...@apache.org.
add STORM-1517 to CHANGELOG.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ef7581b1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ef7581b1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ef7581b1
Branch: refs/heads/1.x-branch
Commit: ef7581b19ff7fd94f8183d65c94830d49281d6fa
Parents: dba9367
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Feb 5 21:55:02 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 5 21:55:02 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ef7581b1/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index df1ddb6..56529ce 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.0.0
+ * STORM-1517: Add peek api in trident stream
* STORM-1455: kafka spout should not reset to the beginning of partition when offsetoutofrange exception occurs
* STORM-1518: Backport of STORM-1504
* STORM-1505: Add map, flatMap and filter functions in trident stream