You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/04/27 01:13:12 UTC
[1/2] storm git commit: STORM-2490: support user defined output fields
Repository: storm
Updated Branches:
refs/heads/master 60da333d2 -> 27ff5851f
STORM-2490: support user defined output fields
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce58ae53
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce58ae53
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce58ae53
Branch: refs/heads/master
Commit: ce58ae5388e5d8c60a511e239ecb57bdbfc17437
Parents: 4d8efae
Author: vesense <be...@163.com>
Authored: Wed Apr 26 17:20:52 2017 +0800
Committer: vesense <be...@163.com>
Committed: Wed Apr 26 17:59:45 2017 +0800
----------------------------------------------------------------------
.../apache/storm/starter/LambdaTopology.java | 5 ++--
.../apache/storm/lambda/AbstractLambdaBolt.java | 30 --------------------
.../storm/lambda/LambdaBiConsumerBolt.java | 14 +++++++--
.../apache/storm/lambda/LambdaConsumerBolt.java | 8 +++++-
.../org/apache/storm/lambda/LambdaSpout.java | 4 +--
.../apache/storm/topology/TopologyBuilder.java | 14 +++++----
6 files changed, 32 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ce58ae53/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java
index 66307ef..61b02db 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java
@@ -44,12 +44,13 @@ public class LambdaTopology extends ConfigurableTopology {
// (or it will cause not serializable exception).
Prefix prefix = new Prefix("Hello lambda:");
String suffix = ":so cool!";
+ int tag = 999;
builder.setSpout("spout1", () -> UUID.randomUUID().toString());
builder.setBolt("bolt1", (tuple, collector) -> {
String[] parts = tuple.getStringByField("lambda").split("\\-");
- collector.emit(new Values(prefix + parts[0] + suffix));
- }).shuffleGrouping("spout1");
+ collector.emit(new Values(prefix + parts[0] + suffix, tag));
+ }, "strValue", "intValue").shuffleGrouping("spout1");
builder.setBolt("bolt2", tuple -> System.out.println(tuple)).shuffleGrouping("bolt1");
Config conf = new Config();
http://git-wip-us.apache.org/repos/asf/storm/blob/ce58ae53/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java b/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java
deleted file mode 100644
index 3ecc457..0000000
--- a/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.lambda;
-
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-
-public abstract class AbstractLambdaBolt extends BaseBasicBolt {
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("lambda"));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/ce58ae53/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java b/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java
index 96d48a2..7e7de9c 100644
--- a/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java
@@ -18,14 +18,20 @@
package org.apache.storm.lambda;
import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
-public class LambdaBiConsumerBolt extends AbstractLambdaBolt {
+public class LambdaBiConsumerBolt extends BaseBasicBolt {
private SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer;
- public LambdaBiConsumerBolt(SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer) {
+ private String[] fields;
+
+ public LambdaBiConsumerBolt(SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer, String[] fields) {
this.biConsumer = biConsumer;
+ this.fields = fields;
}
@Override
@@ -33,4 +39,8 @@ public class LambdaBiConsumerBolt extends AbstractLambdaBolt {
biConsumer.accept(input, collector);
}
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(fields));
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ce58ae53/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java b/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java
index 29bb32e..d9114ed 100644
--- a/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java
@@ -18,9 +18,11 @@
package org.apache.storm.lambda;
import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
-public class LambdaConsumerBolt extends AbstractLambdaBolt {
+public class LambdaConsumerBolt extends BaseBasicBolt {
private SerializableConsumer<Tuple> consumer;
@@ -33,4 +35,8 @@ public class LambdaConsumerBolt extends AbstractLambdaBolt {
consumer.accept(input);
}
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // this bolt dosen't emit to downstream bolts
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ce58ae53/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java b/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java
index 51593b5..6d0ba3a 100644
--- a/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java
@@ -27,10 +27,10 @@ import org.apache.storm.tuple.Values;
import java.util.Map;
public class LambdaSpout extends BaseRichSpout {
- private SerializableSupplier<Object> supplier;
+ private SerializableSupplier<?> supplier;
private SpoutOutputCollector collector;
- public LambdaSpout(SerializableSupplier<Object> supplier) {
+ public LambdaSpout(SerializableSupplier<?> supplier) {
this.supplier = supplier;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ce58ae53/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index 23c5538..d8d8711 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -329,11 +329,12 @@ public class TopologyBuilder {
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param biConsumer lambda expression that implements tuple processing for this bolt
+ * @param fields fields for tuple that should be emitted to downstream bolts
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
- public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer) throws IllegalArgumentException {
- return setBolt(id, biConsumer, null);
+ public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer, String... fields) throws IllegalArgumentException {
+ return setBolt(id, biConsumer, null, fields);
}
/**
@@ -344,12 +345,13 @@ public class TopologyBuilder {
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param biConsumer lambda expression that implements tuple processing for this bolt
+ * @param fields fields for tuple that should be emitted to downstream bolts
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
- public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer, Number parallelism_hint) throws IllegalArgumentException {
- return setBolt(id, new LambdaBiConsumerBolt(biConsumer), parallelism_hint);
+ public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple,BasicOutputCollector> biConsumer, Number parallelism_hint, String... fields) throws IllegalArgumentException {
+ return setBolt(id, new LambdaBiConsumerBolt(biConsumer, fields), parallelism_hint);
}
/**
@@ -427,7 +429,7 @@ public class TopologyBuilder {
* @param supplier lambda expression that implements tuple generating for this spout
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
- public SpoutDeclarer setSpout(String id, SerializableSupplier<Object> supplier) throws IllegalArgumentException {
+ public SpoutDeclarer setSpout(String id, SerializableSupplier<?> supplier) throws IllegalArgumentException {
return setSpout(id, supplier, null);
}
@@ -441,7 +443,7 @@ public class TopologyBuilder {
* @param supplier lambda expression that implements tuple generating for this spout
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
- public SpoutDeclarer setSpout(String id, SerializableSupplier<Object> supplier, Number parallelism_hint) throws IllegalArgumentException {
+ public SpoutDeclarer setSpout(String id, SerializableSupplier<?> supplier, Number parallelism_hint) throws IllegalArgumentException {
return setSpout(id, new LambdaSpout(supplier), parallelism_hint);
}
[2/2] storm git commit: Merge branch 'STORM-2490-lambda' of
https://github.com/vesense/storm
Posted by xi...@apache.org.
Merge branch 'STORM-2490-lambda' of https://github.com/vesense/storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/27ff5851
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/27ff5851
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/27ff5851
Branch: refs/heads/master
Commit: 27ff5851fd2b6281e3a6be07c239bec632e70113
Parents: 60da333 ce58ae5
Author: vesense <be...@163.com>
Authored: Thu Apr 27 09:12:13 2017 +0800
Committer: vesense <be...@163.com>
Committed: Thu Apr 27 09:12:13 2017 +0800
----------------------------------------------------------------------
.../apache/storm/starter/LambdaTopology.java | 5 ++--
.../apache/storm/lambda/AbstractLambdaBolt.java | 30 --------------------
.../storm/lambda/LambdaBiConsumerBolt.java | 14 +++++++--
.../apache/storm/lambda/LambdaConsumerBolt.java | 8 +++++-
.../org/apache/storm/lambda/LambdaSpout.java | 4 +--
.../apache/storm/topology/TopologyBuilder.java | 14 +++++----
6 files changed, 32 insertions(+), 43 deletions(-)
----------------------------------------------------------------------