You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/04/27 01:13:12 UTC

[1/2] storm git commit: STORM-2490: support user defined output fields

Repository: storm
Updated Branches:
  refs/heads/master 60da333d2 -> 27ff5851f


STORM-2490: support user defined output fields


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce58ae53
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce58ae53
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce58ae53

Branch: refs/heads/master
Commit: ce58ae5388e5d8c60a511e239ecb57bdbfc17437
Parents: 4d8efae
Author: vesense <be...@163.com>
Authored: Wed Apr 26 17:20:52 2017 +0800
Committer: vesense <be...@163.com>
Committed: Wed Apr 26 17:59:45 2017 +0800

----------------------------------------------------------------------
 .../apache/storm/starter/LambdaTopology.java    |  5 ++--
 .../apache/storm/lambda/AbstractLambdaBolt.java | 30 --------------------
 .../storm/lambda/LambdaBiConsumerBolt.java      | 14 +++++++--
 .../apache/storm/lambda/LambdaConsumerBolt.java |  8 +++++-
 .../org/apache/storm/lambda/LambdaSpout.java    |  4 +--
 .../apache/storm/topology/TopologyBuilder.java  | 14 +++++----
 6 files changed, 32 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ce58ae53/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java
index 66307ef..61b02db 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java
@@ -44,12 +44,13 @@ public class LambdaTopology extends ConfigurableTopology {
         // (or it will cause not serializable exception).
         Prefix prefix = new Prefix("Hello lambda:");
         String suffix = ":so cool!";
+        int tag = 999;
 
         builder.setSpout("spout1", () -> UUID.randomUUID().toString());
         builder.setBolt("bolt1", (tuple, collector) -> {
             String[] parts = tuple.getStringByField("lambda").split("\\-");
-            collector.emit(new Values(prefix + parts[0] + suffix));
-        }).shuffleGrouping("spout1");
+            collector.emit(new Values(prefix + parts[0] + suffix, tag));
+        }, "strValue", "intValue").shuffleGrouping("spout1");
         builder.setBolt("bolt2", tuple -> System.out.println(tuple)).shuffleGrouping("bolt1");
 
         Config conf = new Config();

http://git-wip-us.apache.org/repos/asf/storm/blob/ce58ae53/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java b/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java
deleted file mode 100644
index 3ecc457..0000000
--- a/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.lambda;
-
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-
-public abstract class AbstractLambdaBolt extends BaseBasicBolt {
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("lambda"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ce58ae53/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java b/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java
index 96d48a2..7e7de9c 100644
--- a/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java
@@ -18,14 +18,20 @@
 package org.apache.storm.lambda;
 
 import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 
-public class LambdaBiConsumerBolt extends AbstractLambdaBolt {
+public class LambdaBiConsumerBolt extends BaseBasicBolt {
 
     private SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer;
 
-    public LambdaBiConsumerBolt(SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer) {
+    private String[] fields;
+
+    public LambdaBiConsumerBolt(SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer, String[] fields) {
         this.biConsumer = biConsumer;
+        this.fields = fields;
     }
 
     @Override
@@ -33,4 +39,8 @@ public class LambdaBiConsumerBolt extends AbstractLambdaBolt {
         biConsumer.accept(input, collector);
     }
 
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(fields));
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ce58ae53/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java b/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java
index 29bb32e..d9114ed 100644
--- a/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java
@@ -18,9 +18,11 @@
 package org.apache.storm.lambda;
 
 import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
 import org.apache.storm.tuple.Tuple;
 
-public class LambdaConsumerBolt extends AbstractLambdaBolt {
+public class LambdaConsumerBolt extends BaseBasicBolt {
 
     private SerializableConsumer<Tuple> consumer;
 
@@ -33,4 +35,8 @@ public class LambdaConsumerBolt extends AbstractLambdaBolt {
         consumer.accept(input);
     }
 
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        // this bolt dosen't emit to downstream bolts
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ce58ae53/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java b/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java
index 51593b5..6d0ba3a 100644
--- a/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java
@@ -27,10 +27,10 @@ import org.apache.storm.tuple.Values;
 import java.util.Map;
 
 public class LambdaSpout extends BaseRichSpout {
-    private SerializableSupplier<Object> supplier;
+    private SerializableSupplier<?> supplier;
     private SpoutOutputCollector collector;
 
-    public LambdaSpout(SerializableSupplier<Object> supplier) {
+    public LambdaSpout(SerializableSupplier<?> supplier) {
         this.supplier = supplier;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ce58ae53/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index 23c5538..d8d8711 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -329,11 +329,12 @@ public class TopologyBuilder {
      *
      * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
      * @param biConsumer lambda expression that implements tuple processing for this bolt
+     * @param fields fields for tuple that should be emitted to downstream bolts
      * @return use the returned object to declare the inputs to this component
      * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
      */
-    public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer) throws IllegalArgumentException {
-        return setBolt(id, biConsumer, null);
+    public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer, String... fields) throws IllegalArgumentException {
+        return setBolt(id, biConsumer, null, fields);
     }
 
     /**
@@ -344,12 +345,13 @@ public class TopologyBuilder {
      *
      * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
      * @param biConsumer lambda expression that implements tuple processing for this bolt
+     * @param fields fields for tuple that should be emitted to downstream bolts
      * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
      * @return use the returned object to declare the inputs to this component
      * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
      */
-    public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer, Number parallelism_hint) throws IllegalArgumentException {
-        return setBolt(id, new LambdaBiConsumerBolt(biConsumer), parallelism_hint);
+    public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer, Number parallelism_hint, String... fields) throws IllegalArgumentException {
+        return setBolt(id, new LambdaBiConsumerBolt(biConsumer, fields), parallelism_hint);
     }
 
     /**
@@ -427,7 +429,7 @@ public class TopologyBuilder {
      * @param supplier lambda expression that implements tuple generating for this spout
      * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
      */
-    public SpoutDeclarer setSpout(String id, SerializableSupplier<Object> supplier) throws IllegalArgumentException {
+    public SpoutDeclarer setSpout(String id, SerializableSupplier<?> supplier) throws IllegalArgumentException {
         return setSpout(id, supplier, null);
     }
 
@@ -441,7 +443,7 @@ public class TopologyBuilder {
      * @param supplier lambda expression that implements tuple generating for this spout
      * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
      */
-    public SpoutDeclarer setSpout(String id, SerializableSupplier<Object> supplier, Number parallelism_hint) throws IllegalArgumentException {
+    public SpoutDeclarer setSpout(String id, SerializableSupplier<?> supplier, Number parallelism_hint) throws IllegalArgumentException {
         return setSpout(id, new LambdaSpout(supplier), parallelism_hint);
     }
 


[2/2] storm git commit: Merge branch 'STORM-2490-lambda' of https://github.com/vesense/storm

Posted by xi...@apache.org.
Merge branch 'STORM-2490-lambda' of https://github.com/vesense/storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/27ff5851
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/27ff5851
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/27ff5851

Branch: refs/heads/master
Commit: 27ff5851fd2b6281e3a6be07c239bec632e70113
Parents: 60da333 ce58ae5
Author: vesense <be...@163.com>
Authored: Thu Apr 27 09:12:13 2017 +0800
Committer: vesense <be...@163.com>
Committed: Thu Apr 27 09:12:13 2017 +0800

----------------------------------------------------------------------
 .../apache/storm/starter/LambdaTopology.java    |  5 ++--
 .../apache/storm/lambda/AbstractLambdaBolt.java | 30 --------------------
 .../storm/lambda/LambdaBiConsumerBolt.java      | 14 +++++++--
 .../apache/storm/lambda/LambdaConsumerBolt.java |  8 +++++-
 .../org/apache/storm/lambda/LambdaSpout.java    |  4 +--
 .../apache/storm/topology/TopologyBuilder.java  | 14 +++++----
 6 files changed, 32 insertions(+), 43 deletions(-)
----------------------------------------------------------------------