You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/21 19:36:06 UTC
[3/4] flink git commit: [FLINK-2306] Add support for named streams in
Storm compatibility layer - enabled .declareStream() and connect via stream
name - enabled multiplt output streams - added .split() / .select() / strip
pattern - added helpers in n
[FLINK-2306] Add support for named streams in Storm compatibility layer
- enabled .declareStream() and connect via stream name
- enabled multiplt output streams
- added .split() / .select() / strip pattern
- added helpers in new package utils
- adapted and extended JUnit tests
- adapted examples
some minor improvements (FlinkClient, integration of Tuple0)
This closes #1011
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a830299
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a830299
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a830299
Branch: refs/heads/master
Commit: 3a8302998d2fc7f38504c238916bc7d0dada2320
Parents: a82bd43
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Thu Aug 13 08:56:47 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 21 19:35:39 2015 +0200
----------------------------------------------------------------------
docs/apis/storm_compatibility.md | 40 +++-
.../flink-storm-compatibility-core/README.md | 1 -
.../stormcompatibility/api/FlinkClient.java | 24 ++-
.../api/FlinkOutputFieldsDeclarer.java | 70 +++----
.../stormcompatibility/api/FlinkSubmitter.java | 2 -
.../api/FlinkTopologyBuilder.java | 182 +++++++++++++------
.../util/FlinkStormStreamSelector.java | 48 +++++
.../util/SplitStreamMapper.java | 39 ++++
.../util/SplitStreamType.java | 52 ++++++
.../wrappers/AbstractStormCollector.java | 118 +++++++-----
.../wrappers/AbstractStormSpoutWrapper.java | 41 +++--
.../wrappers/FiniteStormSpoutWrapper.java | 85 +++++----
.../wrappers/StormBoltCollector.java | 21 ++-
.../wrappers/StormBoltWrapper.java | 136 ++++++++++----
.../wrappers/StormFiniteSpoutWrapper.java | 138 +++++++++-----
.../wrappers/StormOutputFieldsDeclarer.java | 31 +---
.../wrappers/StormSpoutCollector.java | 24 +--
.../wrappers/StormSpoutWrapper.java | 68 +++++--
.../wrappers/StormWrapperSetupHelper.java | 66 ++++---
.../api/FlinkOutputFieldsDeclarerTest.java | 113 +++++++-----
.../api/FlinkTopologyBuilderTest.java | 48 +++++
.../flink/stormcompatibility/api/TestBolt.java | 48 +++++
.../flink/stormcompatibility/api/TestSpout.java | 59 ++++++
.../wrappers/FlinkStormStreamSelectorTest.java | 51 ++++++
.../wrappers/StormBoltCollectorTest.java | 26 +--
.../wrappers/StormBoltWrapperTest.java | 118 ++++++++++--
.../wrappers/StormOutputFieldsDeclarerTest.java | 37 ++--
.../wrappers/StormSpoutCollectorTest.java | 22 ++-
.../wrappers/StormTupleTest.java | 2 +-
.../wrappers/StormWrapperSetupHelperTest.java | 47 +++--
.../flink-storm-compatibility-examples/pom.xml | 3 +
.../excamation/ExclamationWithStormBolt.java | 7 +-
.../excamation/ExclamationWithStormSpout.java | 12 +-
.../split/SpoutSplitExample.java | 102 +++++++++++
.../split/stormoperators/RandomSpout.java | 76 ++++++++
.../stormoperators/VerifyAndEnrichBolt.java | 61 +++++++
.../wordcount/SpoutSourceWordCount.java | 9 +-
.../split/BoltSplitITCase.java | 28 +++
.../stormcompatibility/split/SplitBolt.java | 61 +++++++
.../split/SplitBoltTopology.java | 87 +++++++++
.../split/SplitSpoutTopology.java | 85 +++++++++
.../split/SpoutSplitITCase.java | 28 +++
.../split/StormSplitStreamBoltLocal.java | 51 ++++++
.../split/StormSplitStreamSpoutLocal.java | 51 ++++++
44 files changed, 1918 insertions(+), 500 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
index b38667b..3a0c025 100644
--- a/docs/apis/storm_compatibility.md
+++ b/docs/apis/storm_compatibility.md
@@ -49,7 +49,9 @@ Add the following dependency to your `pom.xml` if you want to execute Storm code
</dependency>
~~~
-**Please note**: `flink-storm-compatibility-core` is not part of the provided binary Flink distribution. Thus, you need to include `flink-storm-compatiblitly-core` classes (and their dependencies) in your program jar that is submitted to Flink's JobManager.
+**Please note**: `flink-storm-compatibility-core` is not part of the provided binary Flink distribution.
+Thus, you need to include `flink-storm-compatiblitly-core` classes (and their dependencies) in your program jar that is submitted to Flink's JobManager.
+See *WordCount Storm* within `flink-storm-compatibility-example/pom.xml` for an example how to package a jar correctly.
# Execute Storm Topologies
@@ -93,7 +95,7 @@ if(runLocal) { // submit to test cluster
As an alternative, Spouts and Bolts can be embedded into regular streaming programs.
The Storm compatibility layer offers a wrapper classes for each, namely `StormSpoutWrapper` and `StormBoltWrapper` (`org.apache.flink.stormcompatibility.wrappers`).
-Per default, both wrappers convert Storm output tuples to Flink's [Tuple](programming_guide.html#tuples-and-case-classes) types (ie, `Tuple1` to `Tuple25` according to the number of fields of the Storm tuples).
+Per default, both wrappers convert Storm output tuples to Flink's [Tuple](programming_guide.html#tuples-and-case-classes) types (ie, `Tuple0` to `Tuple25` according to the number of fields of the Storm tuples).
For single field output tuples a conversion to the field's data type is also possible (eg, `String` instead of `Tuple1<String>`).
Because Flink cannot infer the output field types of Storm operators, it is required to specify the output type manually.
@@ -112,7 +114,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
// stream has `raw` type (single field output streams only)
DataStream<String> rawInput = env.addSource(
- new StormSpoutWrapper<String>(new StormFileSpout(localFilePath), true), // Spout source, 'true' for raw type
+ new StormSpoutWrapper<String>(new StormFileSpout(localFilePath), new String[] { Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
TypeExtractor.getForClass(String.class)); // output type
// process data stream
@@ -167,6 +169,38 @@ The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.
See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.
+## Multiple Output Streams
+
+Flink can also handle the declaration of multiple output streams for Spouts and Bolts.
+If a whole topology is executed using `FlinkTopologyBuilder` etc., there is no special attention required – it works as in regular Storm.
+For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitDataStream.select(...)`.
+Flink provides the predefined output selector `FlinkStormStreamSelector<T>` for `.split(...)` already.
+Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using `SplitStreamMapper<T>`.
+If a data stream of type `SplitStreamTuple<T>` is used as input for a Bolt, `SplitStreamTuple<T>` must **not** be removed – `StormBoltWrapper` removes it automatically.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+[...]
+
+// get DataStream from Spout or Bolt which declares two output streams s1 and s2 with output type SomeType
+DataStream<SplitStreamType<SomeType>> multiStream = ...
+
+SplitDataStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new FlinkStormStreamSelector<SomeType>());
+
+// remove SplitStreamMapper to get data stream of type SomeType
+DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>).returns(SomeType.classs);
+// apply Bolt directly, without stripping SplitStreamMapper
+DataStream<BoltOutputType> s2 = splitStream.select("s2").transform(/* use Bolt for further processing */);
+
+// do further processing on s1 and s2
+[...]
+~~~
+</div>
+</div>
+
+See [SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java) for a full example.
+
# Flink Extensions
## Finite Storm Spouts
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
index 04d8934..aef4847 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
@@ -5,7 +5,6 @@ The Storm compatibility layer allows to embed spouts or bolt unmodified within a
The following Strom features are not (yet/fully) supported by the compatibility layer right now:
* the spout/bolt configuration within `open()`/`prepare()` is not yet supported (ie, `Map conf` parameter)
* topology and tuple meta information (ie, `TopologyContext` not fully supported)
-* only default stream is supported currently (ie, only a single output stream)
* no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
* for whole Storm topologies the following is not supported by Flink:
* direct emit connection pattern
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
index 5a6e8ca..51a4fa1 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
@@ -30,6 +30,7 @@ import backtype.storm.generated.Nimbus;
import backtype.storm.generated.NotAliveException;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
+
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
@@ -45,6 +46,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+
import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.Future;
@@ -64,6 +66,9 @@ import java.util.Map;
*/
public class FlinkClient {
+ /** The client's configuration */
+ @SuppressWarnings("unused")
+ private final Map<?,?> conf;
/** The jobmanager's host name */
private final String jobManagerHost;
/** The jobmanager's rpc port */
@@ -77,19 +82,24 @@ public class FlinkClient {
* Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
* Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
*
+ * @param conf
+ * A configuration.
* @param host
* The jobmanager's host name.
* @param port
* The jobmanager's rpc port.
*/
- public FlinkClient(final String host, final int port) {
- this(host, port, null);
+ @SuppressWarnings("rawtypes")
+ public FlinkClient(final Map conf, final String host, final int port) {
+ this(conf, host, port, null);
}
/**
* Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
* Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
*
+ * @param conf
+ * A configuration.
* @param host
* The jobmanager's host name.
* @param port
@@ -97,7 +107,9 @@ public class FlinkClient {
* @param timeout
* Timeout
*/
- public FlinkClient(final String host, final int port, final Integer timeout) {
+ @SuppressWarnings("rawtypes")
+ public FlinkClient(final Map conf, final String host, final int port, final Integer timeout) {
+ this.conf = conf;
this.jobManagerHost = host;
this.jobManagerPort = port;
if (timeout != null) {
@@ -119,7 +131,7 @@ public class FlinkClient {
public static FlinkClient getConfiguredClient(final Map conf) {
final String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
- return new FlinkClient(nimbusHost, nimbusPort);
+ return new FlinkClient(conf, nimbusHost, nimbusPort);
}
/**
@@ -133,7 +145,7 @@ public class FlinkClient {
return this;
}
- public void close() {/* nothing to do */}
+ // The following methods are derived from "backtype.storm.generated.Nimubs.Client"
/**
* Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
@@ -220,6 +232,8 @@ public class FlinkClient {
}
}
+ // Flink specific additional methods
+
/**
* Package internal method to get a Flink {@link JobID} from a Storm topology name.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
index 49d73f8..e2f6332 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
@@ -20,10 +20,12 @@ package org.apache.flink.stormcompatibility.api;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
+
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import java.util.HashMap;
import java.util.List;
/**
@@ -36,8 +38,8 @@ import java.util.List;
*/
final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
- /** the declared output schema */
- Fields outputSchema;
+ /** the declared output streams and schemas */
+ final HashMap<String, Fields> outputStreams = new HashMap<String, Fields>();
@Override
public void declare(final Fields fields) {
@@ -57,15 +59,6 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
}
- /**
- * {@inheritDoc}
- * <p/>
- * Currently, Flink only supports the default output stream. Thus, parameter {@code streamId} must be equals to
- * {@link Utils#DEFAULT_STREAM_ID}.
- *
- * @throws UnsupportedOperationException
- * if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID}
- */
@Override
public void declareStream(final String streamId, final Fields fields) {
this.declareStream(streamId, false, fields);
@@ -74,40 +67,45 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
/**
* {@inheritDoc}
* <p/>
- * Currently, Flink only supports the default output stream. Thus, parameter {@code streamId} must be equals to
- * {@link Utils#DEFAULT_STREAM_ID}. Furthermore, direct emit is no supported by Flink and parameter {@code direct}
- * must be {@code false}.
+ * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
*
* @throws UnsupportedOperationException
- * if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID} or {@code direct} is {@code true}
+ * if {@code direct} is {@code true}
*/
@Override
public void declareStream(final String streamId, final boolean direct, final Fields fields) {
- if (!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
- throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink");
- }
if (direct) {
throw new UnsupportedOperationException("Direct emit is not supported by Flink");
}
- this.outputSchema = fields;
+ this.outputStreams.put(streamId, fields);
}
/**
- * Returns {@link TypeInformation} for the declared output schema. If no or an empty output schema was declared,
- * {@code null} is returned.
- *
- * @return output type information for the declared output schema; or {@code null} if no output schema was declared
+ * Returns {@link TypeInformation} for the declared output schema for a specific stream.
+ *
+ * @param streamId
+ * A stream ID.
+ *
+ * @return output type information for the declared output schema of the specified stream; or {@code null} if
+ * {@code streamId == null}
+ *
* @throws IllegalArgumentException
- * if more then 25 attributes are declared
+ * If no output schema was declared for the specified stream or if more then 25 attributes got declared.
*/
- public TypeInformation<?> getOutputType() throws IllegalArgumentException {
- if ((this.outputSchema == null) || (this.outputSchema.size() == 0)) {
+ public TypeInformation<?> getOutputType(final String streamId) throws IllegalArgumentException {
+ if (streamId == null) {
return null;
}
+ Fields outputSchema = this.outputStreams.get(streamId);
+ if (outputSchema == null) {
+ throw new IllegalArgumentException("Stream with ID '" + streamId
+ + "' was not declared.");
+ }
+
Tuple t;
- final int numberOfAttributes = this.outputSchema.size();
+ final int numberOfAttributes = outputSchema.size();
if (numberOfAttributes == 1) {
return TypeExtractor.getForClass(Object.class);
@@ -148,16 +146,22 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
}
/**
- * Computes the indexes within the declared output schema, for a list of given field-grouping attributes.
- *
- * @return array of {@code int}s that contains the index without the output schema for each attribute in the given
- * list
+ * Computes the indexes within the declared output schema of the specified stream, for a list of given
+ * field-grouping attributes.
+ *
+ * @param streamId
+ * A stream ID.
+ * @param groupingFields
+ * The names of the key fields.
+ *
+ * @return array of {@code int}s that contains the index within the output schema for each attribute in the given
+ * list
*/
- public int[] getGroupingFieldIndexes(final List<String> groupingFields) {
+ public int[] getGroupingFieldIndexes(final String streamId, final List<String> groupingFields) {
final int[] fieldIndexes = new int[groupingFields.size()];
for (int i = 0; i < fieldIndexes.length; ++i) {
- fieldIndexes[i] = this.outputSchema.fieldIndex(groupingFields.get(i));
+ fieldIndexes[i] = this.outputStreams.get(streamId).fieldIndex(groupingFields.get(i));
}
return fieldIndexes;
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
index 819dbbc..bcc2afb 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
@@ -125,8 +125,6 @@ public class FlinkSubmitter {
} catch (final AlreadyAliveException e) {
logger.warn("Topology already alive exception", e);
throw e;
- } finally {
- client.close();
}
logger.info("Finished submitting topology: " + name);
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index d146250..a739c23 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -33,6 +33,9 @@ import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
import org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper;
import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
@@ -41,6 +44,7 @@ import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
import java.util.HashMap;
import java.util.HashSet;
@@ -54,8 +58,7 @@ import java.util.Set;
* topology. Most methods (except {@link #createTopology()} are copied from the original {@link TopologyBuilder}
* implementation to ensure equal behavior.<br />
* <br />
- * <strong>CAUTION: {@link IRichStateSpout StateSpout}s and multiple output streams per spout/bolt are currently not
- * supported.</strong>
+ * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported.</strong>
*/
public class FlinkTopologyBuilder {
@@ -65,13 +68,13 @@ public class FlinkTopologyBuilder {
private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>();
/** All user bolts by their ID */
private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
- /** All declared output schemas by operator ID */
- private final HashMap<String, Fields> outputSchemas = new HashMap<String, Fields>();
+ /** All declared streams and output schemas by operator ID */
+ private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<String, HashMap<String, Fields>>();
/** All spouts&bolts declarers by their ID */
private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>();
/**
- * Creates a Flink program that used the specified spouts and bolts.
+ * Creates a Flink program that uses the specified spouts and bolts.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public FlinkTopology createTopology() {
@@ -79,8 +82,7 @@ public class FlinkTopologyBuilder {
final FlinkTopology env = new FlinkTopology(stormTopolgoy);
env.setParallelism(1);
- final HashMap<String, SingleOutputStreamOperator> availableOperators =
- new HashMap<String, SingleOutputStreamOperator>();
+ final HashMap<String, HashMap<String, DataStream>> availableInputs = new HashMap<String, HashMap<String, DataStream>>();
for (final Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
final String spoutId = spout.getKey();
@@ -88,14 +90,10 @@ public class FlinkTopologyBuilder {
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
userSpout.declareOutputFields(declarer);
- this.outputSchemas.put(spoutId, declarer.outputSchema);
+ final HashMap<String,Fields> sourceStreams = declarer.outputStreams;
+ this.outputStreams.put(spoutId, sourceStreams);
declarers.put(spoutId, declarer);
- /* TODO in order to support multiple output streams, use an additional wrapper (or modify StormSpoutWrapper
- * and StormCollector)
- * -> add an additional output attribute tagging the output stream, and use .split() and .select() to split
- * the streams
- */
AbstractStormSpoutWrapper spoutWrapper;
if (userSpout instanceof FiniteStormSpout) {
@@ -104,8 +102,23 @@ public class FlinkTopologyBuilder {
spoutWrapper = new StormSpoutWrapper(userSpout);
}
- final DataStreamSource source = env.addSource(spoutWrapper, declarer.getOutputType());
- availableOperators.put(spoutId, source);
+ DataStreamSource source;
+ HashMap<String, DataStream> outputStreams = new HashMap<String, DataStream>();
+ if (sourceStreams.size() == 1) {
+ final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];
+ source = env.addSource(spoutWrapper, spoutId,
+ declarer.getOutputType(outputStreamId));
+ outputStreams.put(outputStreamId, source);
+ } else {
+ source = env.addSource(spoutWrapper, spoutId,
+ TypeExtractor.getForClass(SplitStreamType.class));
+ SplitDataStream splitSource = source.split(new FlinkStormStreamSelector());
+
+ for (String streamId : sourceStreams.keySet()) {
+ outputStreams.put(streamId, splitSource.select(streamId));
+ }
+ }
+ availableInputs.put(spoutId, outputStreams);
int dop = 1;
final ComponentCommon common = stormTopolgoy.get_spouts().get(spoutId).get_common();
@@ -126,7 +139,14 @@ public class FlinkTopologyBuilder {
* its producer
* ->thus, we might need to repeat multiple times
*/
+ boolean makeProgress = true;
while (unprocessedBolts.size() > 0) {
+ if (!makeProgress) {
+ throw new RuntimeException(
+ "Unable to build Topology. Could not connect the following bolts: "
+ + unprocessedBolts.keySet());
+ }
+ makeProgress = false;
final Iterator<Entry<String, IRichBolt>> boltsIterator = unprocessedBolts.entrySet().iterator();
while (boltsIterator.hasNext()) {
@@ -135,11 +155,6 @@ public class FlinkTopologyBuilder {
final String boltId = bolt.getKey();
final IRichBolt userBolt = bolt.getValue();
- final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
- userBolt.declareOutputFields(declarer);
- this.outputSchemas.put(boltId, declarer.outputSchema);
- declarers.put(boltId, declarer);
-
final ComponentCommon common = stormTopolgoy.get_bolts().get(boltId).get_common();
Set<Entry<GlobalStreamId, Grouping>> unprocessedInputs = unprocessdInputsPerBolt.get(boltId);
@@ -153,51 +168,98 @@ public class FlinkTopologyBuilder {
final Iterator<Entry<GlobalStreamId, Grouping>> inputStreamsIterator = unprocessedInputs.iterator();
while (inputStreamsIterator.hasNext()) {
- final Entry<GlobalStreamId, Grouping> inputStream = inputStreamsIterator.next();
- final String producerId = inputStream.getKey().get_componentId();
-
- DataStream<?> inputDataStream = availableOperators.get(producerId);
-
- if (inputDataStream != null) {
- // if producer was processed already
- final Grouping grouping = inputStream.getValue();
- if (grouping.is_set_shuffle()) {
- // Storm uses a round-robin shuffle strategy
- inputDataStream = inputDataStream.rebalance();
- } else if (grouping.is_set_fields()) {
- // global grouping is emulated in Storm via an empty fields grouping list
- final List<String> fields = grouping.get_fields();
- if (fields.size() > 0) {
- FlinkOutputFieldsDeclarer procDeclarer = this.declarers.get(producerId);
- inputDataStream = inputDataStream.groupBy(procDeclarer.getGroupingFieldIndexes(grouping
- .get_fields()));
- } else {
- inputDataStream = inputDataStream.global();
+ final Entry<GlobalStreamId, Grouping> stormInputStream = inputStreamsIterator.next();
+ final String producerId = stormInputStream.getKey().get_componentId();
+ final String inputStreamId = stormInputStream.getKey().get_streamId();
+
+ HashMap<String, DataStream> producer = availableInputs.get(producerId);
+ if (producer != null) {
+ makeProgress = true;
+
+ DataStream inputStream = producer.get(inputStreamId);
+ if (inputStream != null) {
+ final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+ userBolt.declareOutputFields(declarer);
+ final HashMap<String, Fields> boltOutputStreams = declarer.outputStreams;
+ this.outputStreams.put(boltId, boltOutputStreams);
+ this.declarers.put(boltId, declarer);
+
+ // if producer was processed already
+ final Grouping grouping = stormInputStream.getValue();
+ if (grouping.is_set_shuffle()) {
+ // Storm uses a round-robin shuffle strategy
+ inputStream = inputStream.rebalance();
+ } else if (grouping.is_set_fields()) {
+ // global grouping is emulated in Storm via an empty fields grouping list
+ final List<String> fields = grouping.get_fields();
+ if (fields.size() > 0) {
+ FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
+ inputStream = inputStream.groupBy(prodDeclarer
+ .getGroupingFieldIndexes(inputStreamId,
+ grouping.get_fields()));
+ } else {
+ inputStream = inputStream.global();
+ }
+ } else if (grouping.is_set_all()) {
+ inputStream = inputStream.broadcast();
+ } else if (!grouping.is_set_local_or_shuffle()) {
+ throw new UnsupportedOperationException(
+ "Flink only supports (local-or-)shuffle, fields, all, and global grouping");
}
- } else if (grouping.is_set_all()) {
- inputDataStream = inputDataStream.broadcast();
- } else if (!grouping.is_set_local_or_shuffle()) {
- throw new UnsupportedOperationException(
- "Flink only supports (local-or-)shuffle, fields, all, and global grouping");
- }
- final TypeInformation<?> outType = declarer.getOutputType();
+ SingleOutputStreamOperator outputStream;
+ if (boltOutputStreams.size() < 2) { // single output stream or sink
+ String outputStreamId = null;
+ if (boltOutputStreams.size() == 1) {
+ outputStreamId = (String) boltOutputStreams.keySet().toArray()[0];
+ }
+ final TypeInformation<?> outType = declarer
+ .getOutputType(outputStreamId);
+
+ outputStream = inputStream.transform(
+ boltId,
+ outType,
+ new StormBoltWrapper(userBolt, this.outputStreams.get(
+ producerId).get(inputStreamId)));
+
+ if (outType != null) {
+ // only for non-sink nodes
+ HashMap<String, DataStream> op = new HashMap<String, DataStream>();
+ op.put(outputStreamId, outputStream);
+ availableInputs.put(boltId, op);
+ }
+ } else {
+ final TypeInformation<?> outType = TypeExtractor
+ .getForClass(SplitStreamType.class);
+
+ outputStream = inputStream.transform(
+ boltId,
+ outType,
+ new StormBoltWrapper(userBolt, this.outputStreams.get(
+ producerId).get(inputStreamId)));
+
+ SplitDataStream splitStreams = outputStream
+ .split(new FlinkStormStreamSelector());
+
+ HashMap<String, DataStream> op = new HashMap<String, DataStream>();
+ for (String outputStreamId : boltOutputStreams.keySet()) {
+ op.put(outputStreamId, splitStreams.select(outputStreamId));
+ }
+ availableInputs.put(boltId, op);
+ }
- final SingleOutputStreamOperator operator = inputDataStream.transform(boltId, outType,
- new StormBoltWrapper(userBolt, this.outputSchemas.get(producerId)));
- if (outType != null) {
- // only for non-sink nodes
- availableOperators.put(boltId, operator);
- }
+ int dop = 1;
+ if (common.is_set_parallelism_hint()) {
+ dop = common.get_parallelism_hint();
+ outputStream.setParallelism(dop);
+ }
+ env.increaseNumberOfTasks(dop);
- int dop = 1;
- if (common.is_set_parallelism_hint()) {
- dop = common.get_parallelism_hint();
- operator.setParallelism(dop);
+ inputStreamsIterator.remove();
+ } else {
+ throw new RuntimeException("Cannot connect '" + boltId + "' to '"
+ + producerId + "'. Stream '" + inputStreamId + "' not found.");
}
- env.increaseNumberOfTasks(dop);
-
- inputStreamsIterator.remove();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
new file mode 100644
index 0000000..7ca45d6
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+
+/**
+ * Used by {@link FlinkTopologyBuilder} to split multiple declared output streams within Flink.
+ */
+final public class FlinkStormStreamSelector<T> implements OutputSelector<SplitStreamType<T>> {
+ private static final long serialVersionUID = 2553423379715401023L;
+
+ /** internal cache to avoid short living ArrayList objects. */
+ private final HashMap<String, List<String>> streams = new HashMap<String, List<String>>();
+
+ @Override
+ public Iterable<String> select(SplitStreamType<T> value) {
+ String sid = value.streamId;
+ List<String> streamId = this.streams.get(sid);
+ if (streamId == null) {
+ streamId = new ArrayList<String>(1);
+ streamId.add(sid);
+ this.streams.put(sid, streamId);
+ }
+ return streamId;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
new file mode 100644
index 0000000..afcdcae
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+
+/**
+ * Strips {@link SplitStreamType}{@code <T>} away, ie, extracts the wrapped record of type {@code T}. Can be used to get
+ * a "clean" stream from a Spout/Bolt that declared multiple output streams (after the streams got separated using
+ * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} and
+ * {@link SplitDataStream#select(String...) .select(...)}).
+ *
+ * @param <T>
+ */
+public class SplitStreamMapper<T> implements MapFunction<SplitStreamType<T>, T> {
+ private static final long serialVersionUID = 3550359150160908564L;
+
+ @Override
+ public T map(SplitStreamType<T> value) throws Exception {
+ return value.value;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
new file mode 100644
index 0000000..9c7e477
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * Used by {@link org.apache.flink.stormcompatibility.wrappers.AbstractStormCollector AbstractStormCollector} to wrap
+ * output tuples if multiple output streams are declared. For this case, the Flink output data stream must be split via
+ * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} using
+ * {@link FlinkStormStreamSelector}.
+ */
+public class SplitStreamType<T> {
+
+ /** The stream ID this tuple belongs to. */
+ public String streamId;
+ /** The actual data value. */
+ public T value;
+
+ @Override
+ public String toString() {
+ return "<sid:" + this.streamId + ",v:" + this.value + ">";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SplitStreamType<?> other = (SplitStreamType<?>) o;
+
+ return this.streamId.equals(other.streamId) && this.value.equals(other.value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
index 4a8fb7d..7b35a64 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
@@ -17,82 +17,114 @@
package org.apache.flink.stormcompatibility.wrappers;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
+
+import java.util.HashMap;
import java.util.List;
+import java.util.Map.Entry;
/**
* A {@link AbstractStormCollector} transforms Storm tuples to Flink tuples.
*/
abstract class AbstractStormCollector<OUT> {
+ /** Flink output tuple of concrete type {@link Tuple0} to {@link Tuple25} per output stream. */
+ protected final HashMap<String, Tuple> outputTuple = new HashMap<String, Tuple>();
+ /** Flink split tuple. Used, if multiple output streams are declared. */
+ private final SplitStreamType<Object> splitTuple = new SplitStreamType<Object>();
/**
- * Flink output tuple of concrete type {@link Tuple1} to {@link Tuple25}.
- */
- protected final Tuple outputTuple;
- /**
- * The number of attributes of the output tuples. (Determines the concrete type of
- * {@link #outputTuple}). If {@link #numberOfAttributes} is zero, {@link #outputTuple} is not
- * used and "raw" data type is used.
- */
- protected final int numberOfAttributes;
- /**
- * Is set to {@code true} each time a tuple is emitted.
+ * The number of attributes of the output tuples per stream. (Determines the concrete type of {@link #outputTuple}).
+ * If {@link #numberOfAttributes} is zero, {@link #outputTuple} is not used and "raw" data type is used.
*/
+ protected final HashMap<String, Integer> numberOfAttributes;
+ /** Indicates of multiple output stream are declared and thus {@link SplitStreamType} must be used as output. */
+ private final boolean split;
+ /** Is set to {@code true} each time a tuple is emitted. */
boolean tupleEmitted = false;
/**
- * Instantiates a new {@link AbstractStormCollector} that emits Flink tuples via
- * {@link #doEmit(Object)}. If the number of attributes is specified as zero, any output type is
- * supported. If the number of attributes is between 1 to 25, the output type is {@link Tuple1}
- * to {@link Tuple25}.
+ * Instantiates a new {@link AbstractStormCollector} that emits Flink tuples via {@link #doEmit(Object)}. If the
+ * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is
+ * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
*
* @param numberOfAttributes
- * The number of attributes of the emitted tuples.
+ * The number of attributes of the emitted tuples per output stream.
* @throws UnsupportedOperationException
- * if the specified number of attributes is not in the valid range of [0,25]
+ * if the specified number of attributes is greater than 25
*/
- public AbstractStormCollector(final int numberOfAttributes) throws UnsupportedOperationException {
+ public AbstractStormCollector(final HashMap<String, Integer> numberOfAttributes)
+ throws UnsupportedOperationException {
+ assert (numberOfAttributes != null);
+
this.numberOfAttributes = numberOfAttributes;
+ this.split = this.numberOfAttributes.size() > 1;
+
+ for (Entry<String, Integer> outputStream : numberOfAttributes.entrySet()) {
+ final int numAtt = outputStream.getValue();
+ assert (numAtt >= -1);
+
+ if (numAtt > 25) {
+ throw new UnsupportedOperationException(
+ "Flink cannot handle more then 25 attributes, but " + numAtt
+ + " are declared for stream '" + outputStream.getKey()
+ + "' by the given bolt");
+ } else if (numAtt >= 0) {
+ try {
+ this.outputTuple.put(outputStream.getKey(),
+ org.apache.flink.api.java.tuple.Tuple.getTupleClass(numAtt)
+ .newInstance());
+ } catch (final InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (final IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
- if (this.numberOfAttributes <= 0) {
- this.outputTuple = null;
- } else if (this.numberOfAttributes <= 25) {
- try {
- this.outputTuple = org.apache.flink.api.java.tuple.Tuple
- .getTupleClass(this.numberOfAttributes).newInstance();
- } catch (final InstantiationException e) {
- throw new RuntimeException(e);
- } catch (final IllegalAccessException e) {
- throw new RuntimeException(e);
}
- } else {
- throw new UnsupportedOperationException(
- "Flink cannot handle more then 25 attributes, but "
- + this.numberOfAttributes + " are declared by the given bolt");
}
}
/**
- * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via
- * {@link #doEmit(Object)}.
+ * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)}
+ * to the specified output stream.
*
+ * @param The
+ * The output stream id.
* @param tuple
- * The Storm tuple to be emitted.
+ * The Storm tuple to be emitted.
* @return the return value of {@link #doEmit(Object)}
*/
@SuppressWarnings("unchecked")
- protected final List<Integer> transformAndEmit(final List<Object> tuple) {
+ protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) {
List<Integer> taskIds;
- if (this.numberOfAttributes > 0) {
- assert (tuple.size() == this.numberOfAttributes);
- for (int i = 0; i < this.numberOfAttributes; ++i) {
- this.outputTuple.setField(tuple.get(i), i);
+
+ final int numAtt = this.numberOfAttributes.get(streamId);
+ if (numAtt > -1) {
+ assert (tuple.size() == numAtt);
+ Tuple out = this.outputTuple.get(streamId);
+ for (int i = 0; i < numAtt; ++i) {
+ out.setField(tuple.get(i), i);
+ }
+ if (this.split) {
+ this.splitTuple.streamId = streamId;
+ this.splitTuple.value = out;
+
+ taskIds = doEmit((OUT) this.splitTuple);
+ } else {
+ taskIds = doEmit((OUT) out);
}
- taskIds = doEmit((OUT) this.outputTuple);
+
} else {
assert (tuple.size() == 1);
- taskIds = doEmit((OUT) tuple.get(0));
+ if (split) {
+ this.splitTuple.streamId = streamId;
+ this.splitTuple.value = tuple.get(0);
+
+ taskIds = doEmit((OUT) this.splitTuple);
+ } else {
+ taskIds = doEmit((OUT) tuple.get(0));
+ }
}
this.tupleEmitted = true;
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
index 4e43a8a..62059fe 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
@@ -17,8 +17,13 @@
package org.apache.flink.stormcompatibility.wrappers;
+import java.util.Collection;
+import java.util.HashMap;
+
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.IRichSpout;
+
+import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -37,9 +42,9 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF
private static final long serialVersionUID = 4993283609095408765L;
/**
- * Number of attributes of the bolt's output tuples.
+ * Number of attributes of the bolt's output tuples per stream.
*/
- private final int numberOfAttributes;
+ private final HashMap<String, Integer> numberOfAttributes;
/**
* The wrapped Storm {@link IRichSpout spout}.
*/
@@ -55,38 +60,40 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF
/**
* Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
- * that it can be used within a Flink streaming program. The output type will be one of {@link Tuple1} to
+ * that it can be used within a Flink streaming program. The output type will be one of {@link Tuple0} to
* {@link Tuple25} depending on the spout's declared number of attributes.
*
* @param spout
* The Storm {@link IRichSpout spout} to be used.
* @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [1;25].
+ * If the number of declared output attributes is not with range [0;25].
*/
public AbstractStormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
- this(spout, false);
+ this(spout, null);
}
/**
* Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
* that it can be used within a Flink streaming program. The output type can be any type if parameter
* {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
- * {@code false} the output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared
+ * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared
* number of attributes.
- *
+ *
* @param spout
- * The Storm {@link IRichSpout spout} to be used.
- * @param rawOutput
- * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
+ * The Storm {@link IRichSpout spout} to be used.
+ * @param rawOutputs
+ * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
* @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [1;25].
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [0;25].
*/
- public AbstractStormSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException {
+ public AbstractStormSpoutWrapper(final IRichSpout spout,
+ final Collection<String> rawOutputs)
+ throws IllegalArgumentException {
this.spout = spout;
- this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutput);
+ this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs);
}
@Override
@@ -94,7 +101,7 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF
this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
this.spout.open(null,
StormWrapperSetupHelper
- .convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
+ .convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
new SpoutOutputCollector(this.collector));
this.spout.activate();
this.execute();
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
index 7913510..1912afc 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
@@ -17,6 +17,14 @@
package org.apache.flink.stormcompatibility.wrappers;
+import java.util.Collection;
+
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+
+import com.google.common.collect.Sets;
+
/**
* A {@link FiniteStormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls the wrapped
* {@link FiniteStormSpout}'s {@link FiniteStormSpout#nextTuple()} method until {@link
@@ -28,16 +36,14 @@ public class FiniteStormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT>
private FiniteStormSpout finiteSpout;
/**
- * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link
- * FiniteStormSpout spout} such that it can be used within a Flink streaming program. The
- * output
- * type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared
- * number of attributes.
- *
+ * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link FiniteStormSpout spout} such
+ * that it can be used within a Flink streaming program. The output type will be one of {@link Tuple0} to
+ * {@link Tuple25} depending on the spout's declared number of attributes.
+ *
* @param spout
- * The Storm {@link FiniteStormSpout spout} to be used. @throws
- * IllegalArgumentException If
- * the number of declared output attributes is not with range [1;25].
+ * The Storm {@link FiniteStormSpout spout} to be used.
+ * @throws IllegalArgumentException
+ * If the number of declared output attributes is not with range [0;25].
*/
public FiniteStormSpoutWrapper(FiniteStormSpout spout)
throws IllegalArgumentException {
@@ -46,36 +52,53 @@ public class FiniteStormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT>
}
/**
- * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link
- * FiniteStormSpout spout} such that it can be used within a Flink streaming program. The
- * output
- * type can be any type if parameter {@code rawOutput} is {@code true} and the spout's
- * number of
- * declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be
- * one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
- * attributes.
- *
+ * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link FiniteStormSpout spout} such
+ * that it can be used within a Flink streaming program. The output type can be any type if parameter
+ * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
+ * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared
+ * number of attributes.
+ *
+ * @param spout
+ * The Storm {@link FiniteStormSpout spout} to be used.
+ * @param rawOutputs
+ * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
+ * @throws IllegalArgumentException
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [0;25].
+ */
+ public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final String[] rawOutputs)
+ throws IllegalArgumentException {
+ this(spout, Sets.newHashSet(rawOutputs));
+ }
+
+ /**
+ * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link FiniteStormSpout spout} such
+ * that it can be used within a Flink streaming program. The output type can be any type if parameter
+ * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
+ * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared
+ * number of attributes.
+ *
* @param spout
- * The Storm {@link FiniteStormSpout spout} to be used.
- * @param rawOutput
- * Set to {@code true} if a single attribute output stream, should not be of type {@link
- * Tuple1} but be of a raw type.
+ * The Storm {@link FiniteStormSpout spout} to be used.
+ * @param rawOutputs
+ * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
* @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is
- * not 1
- * or if {@code rawOuput} is {@code false} and the number of declared output attributes
- * is not
- * with range [1;25].
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [0;25].
*/
- public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final boolean rawOutput)
+ public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final Collection<String> rawOutputs)
throws IllegalArgumentException {
- super(spout, rawOutput);
+ super(spout, rawOutputs);
this.finiteSpout = spout;
}
/**
- * Calls the {@link FiniteStormSpout#nextTuple()} method until {@link
- * FiniteStormSpout#reachedEnd()} is true or {@link FiniteStormSpout#cancel()} is called.
+ * Calls the {@link FiniteStormSpout#nextTuple()} method until {@link FiniteStormSpout#reachedEnd()} is true or
+ * {@link FiniteStormSpout#cancel()} is called.
*/
@Override
protected void execute() {
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
index 81ad9a6..e810214 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
@@ -20,12 +20,13 @@ package org.apache.flink.stormcompatibility.wrappers;
import backtype.storm.task.IOutputCollector;
import backtype.storm.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.util.Collector;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
/**
@@ -39,19 +40,19 @@ class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOu
private final Collector<OUT> flinkOutput;
/**
- * Instantiates a new {@link StormBoltCollector} that emits Flink tuples to the given Flink
- * output object. If the number of attributes is specified as zero, any output type is
- * supported. If the number of attributes is between 1 to 25, the output type is {@link Tuple1}
- * to {@link Tuple25}.
+ * Instantiates a new {@link StormBoltCollector} that emits Flink tuples to the given Flink output object. If the
+ * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is
+ * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
*
* @param numberOfAttributes
- * The number of attributes of the emitted tuples.
+ * The number of attributes of the emitted tuples per output stream.
* @param flinkOutput
- * The Flink output object to be used.
+ * The Flink output object to be used.
* @throws UnsupportedOperationException
- * if the specified number of attributes is not in the valid range of [0,25]
+ * if the specified number of attributes is greater than 25
*/
- public StormBoltCollector(final int numberOfAttributes, final Collector<OUT> flinkOutput) throws UnsupportedOperationException {
+ public StormBoltCollector(final HashMap<String, Integer> numberOfAttributes,
+ final Collector<OUT> flinkOutput) throws UnsupportedOperationException {
super(numberOfAttributes);
assert (flinkOutput != null);
this.flinkOutput = flinkOutput;
@@ -72,7 +73,7 @@ class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOu
@Override
public List<Integer> emit(final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
- return this.transformAndEmit(tuple);
+ return this.tansformAndEmit(streamId, tuple);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
index 8bcdae0..05a4902 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
@@ -16,23 +16,26 @@
*/
package org.apache.flink.stormcompatibility.wrappers;
+import java.util.Collection;
+import java.util.HashMap;
+
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Fields;
+import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-
-
+import com.google.common.collect.Sets;
/**
* A {@link StormBoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming
@@ -48,10 +51,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = -4788589118464155835L;
- /** The wrapped Storm {@link IRichBolt bolt} */
+ /** The wrapped Storm {@link IRichBolt bolt}. */
private final IRichBolt bolt;
- /** Number of attributes of the bolt's output tuples */
- private final int numberOfAttributes;
+ /** Number of attributes of the bolt's output tuples per stream. */
+ private final HashMap<String, Integer> numberOfAttributes;
/** The schema (ie, ordered field names) of the input stream. */
private final Fields inputSchema;
@@ -64,34 +67,34 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
/**
* Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
- * for POJO input types. The output type will be one of {@link Tuple1} to {@link Tuple25} depending on the bolt's
+ * for POJO input types. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's
* declared number of attributes.
*
* @param bolt
- * The Storm {@link IRichBolt bolt} to be used.
+ * The Storm {@link IRichBolt bolt} to be used.
* @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [1;25].
+ * If the number of declared output attributes is not with range [0;25].
*/
public StormBoltWrapper(final IRichBolt bolt) throws IllegalArgumentException {
- this(bolt, null, false);
+ this(bolt, null, (Collection<String>) null);
}
/**
* Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
- * {@link Tuple1} to {@link Tuple25}. The output type will be one of {@link Tuple1} to {@link Tuple25} depending on
+ * {@link Tuple0} to {@link Tuple25}. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on
* the bolt's declared number of attributes.
*
* @param bolt
- * The Storm {@link IRichBolt bolt} to be used.
+ * The Storm {@link IRichBolt bolt} to be used.
* @param inputSchema
- * The schema (ie, ordered field names) of the input stream.
+ * The schema (ie, ordered field names) of the input stream.
* @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [1;25].
+ * If the number of declared output attributes is not with range [0;25].
*/
public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema)
throws IllegalArgumentException {
- this(bolt, inputSchema, false);
+ this(bolt, inputSchema, (Collection<String>) null);
}
/**
@@ -99,47 +102,93 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
* used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
* for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the
* bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one
- * of {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
+ * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
*
* @param bolt
- * The Storm {@link IRichBolt bolt} to be used.
- * @param rawOutput
- * Set to {@code true} if a single attribute output stream, should not be of type
- * {@link Tuple1} but be of a raw type.
+ * The Storm {@link IRichBolt bolt} to be used.
+ * @param rawOutputs
+ * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
* @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is
- * not 1 or if {@code rawOuput} is {@code false} and the number of declared output
- * attributes is not with range [1;25].
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [1;25].
*/
- public StormBoltWrapper(final IRichBolt bolt, final boolean rawOutput)
+ public StormBoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
throws IllegalArgumentException {
- this(bolt, null, rawOutput);
+ this(bolt, null, Sets.newHashSet(rawOutputs));
+ }
+
+ /**
+ * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
+ * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
+ * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the
+ * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one
+ * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
+ *
+ * @param bolt
+ * The Storm {@link IRichBolt bolt} to be used.
+ * @param rawOutputs
+ * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
+ * @throws IllegalArgumentException
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [1;25].
+ */
+ public StormBoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs)
+ throws IllegalArgumentException {
+ this(bolt, null, rawOutputs);
}
/**
* Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
- * {@link Tuple1} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
+ * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
* and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
- * be one of {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
+ * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
*
* @param bolt
- * The Storm {@link IRichBolt bolt} to be used.
+ * The Storm {@link IRichBolt bolt} to be used.
* @param inputSchema
- * The schema (ie, ordered field names) of the input stream.
- * @param rawOutput
- * Set to {@code true} if a single attribute output stream, should not be of type
- * {@link Tuple1} but be of a raw type.
+ * The schema (ie, ordered field names) of the input stream.
+ * @param rawOutputs
+ * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
* @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is
- * not 1 or if {@code rawOuput} is {@code false} and the number of declared output
- * attributes is not with range [1;25].
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [0;25].
*/
- public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema, final boolean rawOutput)
- throws IllegalArgumentException {
+ public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema,
+ final String[] rawOutputs) throws IllegalArgumentException {
+ this(bolt, inputSchema, Sets.newHashSet(rawOutputs));
+ }
+
+ /**
+ * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
+ * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
+ * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
+ * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
+ * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
+ *
+ * @param bolt
+ * The Storm {@link IRichBolt bolt} to be used.
+ * @param inputSchema
+ * The schema (ie, ordered field names) of the input stream.
+ * @param rawOutputs
+ * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
+ * @throws IllegalArgumentException
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [0;25].
+ */
+ public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema,
+ final Collection<String> rawOutputs) throws IllegalArgumentException {
this.bolt = bolt;
this.inputSchema = inputSchema;
- this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutput);
+ this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs);
}
@Override
@@ -151,7 +200,7 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
flinkCollector = new TimestampedCollector<OUT>(output);
OutputCollector stormCollector = null;
- if (this.numberOfAttributes != -1) {
+ if (this.numberOfAttributes.size() > 0) {
stormCollector = new OutputCollector(new StormBoltCollector<OUT>(
this.numberOfAttributes, flinkCollector));
}
@@ -165,10 +214,17 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
this.bolt.cleanup();
}
+ @SuppressWarnings("unchecked")
@Override
public void processElement(final StreamRecord<IN> element) throws Exception {
flinkCollector.setTimestamp(element.getTimestamp());
- this.bolt.execute(new StormTuple<IN>(element.getValue(), inputSchema));
+ IN value = element.getValue();
+ if (value instanceof SplitStreamType) {
+ this.bolt.execute(new StormTuple<IN>(((SplitStreamType<IN>) value).value,
+ inputSchema));
+ } else {
+ this.bolt.execute(new StormTuple<IN>(value, inputSchema));
+ }
}
@Override