You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/02/05 13:55:24 UTC

[1/3] storm git commit: [STORM-1517] add peek api in trident stream

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 7090be454 -> ef7581b19


[STORM-1517] add peek api in trident stream

Similar to the Java 8 peek, the peek api can be used to examine trident tuples at
some point in the stream pipeline or execute some custom actions.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/96bde69a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/96bde69a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/96bde69a

Branch: refs/heads/1.x-branch
Commit: 96bde69a0e0790546427fd1c4a29ae5e4e4af7ae
Parents: 7090be4
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Tue Feb 2 16:42:11 2016 +0530
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 5 21:45:18 2016 +0900

----------------------------------------------------------------------
 .../starter/trident/TridentMapExample.java      |  7 ++++
 .../jvm/org/apache/storm/trident/Stream.java    | 21 +++++++++++
 .../storm/trident/operation/Consumer.java       | 35 ++++++++++++++++++
 .../operation/impl/ConsumerExecutor.java        | 38 ++++++++++++++++++++
 4 files changed, 101 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/96bde69a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
index 95b52cc..fbb9127 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
@@ -25,6 +25,7 @@ import org.apache.storm.generated.StormTopology;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.operation.BaseFilter;
+import org.apache.storm.trident.operation.Consumer;
 import org.apache.storm.trident.operation.Filter;
 import org.apache.storm.trident.operation.FlatMapFunction;
 import org.apache.storm.trident.operation.MapFunction;
@@ -84,6 +85,12 @@ public class TridentMapExample {
                 .flatMap(split)
                 .map(toUpper)
                 .filter(theFilter)
+                .peek(new Consumer() {
+                    @Override
+                    public void accept(TridentTuple input) {
+                        System.out.println(input.getString(0));
+                    }
+                })
                 .groupBy(new Fields("word"))
                 .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
                 .parallelismHint(16);

http://git-wip-us.apache.org/repos/asf/storm/blob/96bde69a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index dffc984..7c6d93f 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -21,8 +21,10 @@ import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.NullStruct;
 import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
 import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.trident.operation.Consumer;
 import org.apache.storm.trident.operation.FlatMapFunction;
 import org.apache.storm.trident.operation.MapFunction;
+import org.apache.storm.trident.operation.impl.ConsumerExecutor;
 import org.apache.storm.trident.operation.impl.FlatMapFunctionExecutor;
 import org.apache.storm.trident.operation.impl.MapFunctionExecutor;
 import org.apache.storm.trident.planner.processor.MapProcessor;
@@ -387,6 +389,25 @@ public class Stream implements IAggregatableStream {
                                                 new MapProcessor(getOutputFields(), new FlatMapFunctionExecutor(function))));
     }
 
+    /**
+     * Returns a stream consisting of the trident tuples of this stream, additionally performing the provided action on
+     * each trident tuple as they are consumed from the resulting stream. This is mostly useful for debugging
+     * to see the tuples as they flow past a certain point in a pipeline.
+     *
+     * @param action the action to perform on the trident tuple as they are consumed from the stream
+     * @return the new stream
+     */
+    public Stream peek(Consumer action) {
+        projectionValidation(getOutputFields());
+        return _topology.addSourcedNode(this,
+                                        new ProcessorNode(
+                                                _topology.getUniqueStreamId(),
+                                                _name,
+                                                getOutputFields(),
+                                                getOutputFields(),
+                                                new MapProcessor(getOutputFields(), new ConsumerExecutor(action))));
+    }
+
     public ChainedAggregatorDeclarer chainedAgg() {
         return new ChainedAggregatorDeclarer(this, new BatchGlobalAggScheme());
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/96bde69a/storm-core/src/jvm/org/apache/storm/trident/operation/Consumer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/Consumer.java b/storm-core/src/jvm/org/apache/storm/trident/operation/Consumer.java
new file mode 100644
index 0000000..dd13b48
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/Consumer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+/**
+ * Represents an operation that accepts a single input argument and returns no result.
+ * This is similar to the Consumer interface in Java 8.
+ */
+public interface Consumer extends Serializable {
+    /**
+     * Performs the operation on the input trident tuple.
+     *
+     * @param input the input trident tuple
+     */
+    void accept(TridentTuple input);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/96bde69a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/ConsumerExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/impl/ConsumerExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/ConsumerExecutor.java
new file mode 100644
index 0000000..c08a8f7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/impl/ConsumerExecutor.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation.impl;
+
+import org.apache.storm.trident.operation.BaseOperation;
+import org.apache.storm.trident.operation.Consumer;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class ConsumerExecutor extends BaseOperation implements Function {
+    private final Consumer consumer;
+
+    public ConsumerExecutor(Consumer consumer) {
+        this.consumer = consumer;
+    }
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+        consumer.accept(tuple);
+        collector.emit(tuple);
+    }
+}


[2/3] storm git commit: Merge branch 'STORM-1517-1.x' into 1.x-branch

Posted by ka...@apache.org.
Merge branch 'STORM-1517-1.x' into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dba93672
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dba93672
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dba93672

Branch: refs/heads/1.x-branch
Commit: dba93672b6d05670785a61d343a4f99c79ee5846
Parents: 7090be4 96bde69
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Feb 5 21:53:49 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 5 21:53:49 2016 +0900

----------------------------------------------------------------------
 .../starter/trident/TridentMapExample.java      |  7 ++++
 .../jvm/org/apache/storm/trident/Stream.java    | 21 +++++++++++
 .../storm/trident/operation/Consumer.java       | 35 ++++++++++++++++++
 .../operation/impl/ConsumerExecutor.java        | 38 ++++++++++++++++++++
 4 files changed, 101 insertions(+)
----------------------------------------------------------------------



[3/3] storm git commit: add STORM-1517 to CHANGELOG.md

Posted by ka...@apache.org.
add STORM-1517 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ef7581b1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ef7581b1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ef7581b1

Branch: refs/heads/1.x-branch
Commit: ef7581b19ff7fd94f8183d65c94830d49281d6fa
Parents: dba9367
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Feb 5 21:55:02 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 5 21:55:02 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ef7581b1/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index df1ddb6..56529ce 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.0.0
+ * STORM-1517: Add peek api in trident stream
  * STORM-1455: kafka spout should not reset to the beginning of partition when offsetoutofrange exception occurs
  * STORM-1518: Backport of STORM-1504
  * STORM-1505: Add map, flatMap and filter functions in trident stream