You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:38 UTC
[13/15] flink git commit: [Storm Compatibility] Maven module
restucturing and cleanup - removed storm-parent;
renamed storm-core and storm-examples - updated internal Java package
structure * renamed package "stormcompatibility" to "storm" *
[Storm Compatibility] Maven module restucturing and cleanup
- removed storm-parent; renamed storm-core and storm-examples
- updated internal Java package structure
* renamed package "stormcompatibility" to "storm"
* unified *SpoutWrapper* to single SpoutWrapper.java class
* moved classes to appropriate packages
* shortened class names by stripping "Storm"
- some more minor fixes, cleanups, and test improvements
- updated READMEs and web documentation
- updated examples pom.xml to assembly WordCount jars correctly
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cb96708
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cb96708
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cb96708
Branch: refs/heads/master
Commit: 4cb9670892e34bd2321a2848f5195588e4442d0d
Parents: 7a67a60
Author: mjsax <mj...@apache.org>
Authored: Fri Oct 2 18:43:38 2015 +0200
Committer: mjsax <mj...@informatik.hu-berlin.de>
Committed: Tue Oct 6 13:29:32 2015 +0200
----------------------------------------------------------------------
docs/apis/storm_compatibility.md | 111 ++--
.../flink-storm-compatibility-core/README.md | 12 -
.../flink-storm-compatibility-core/pom.xml | 114 ----
.../stormcompatibility/api/FlinkClient.java | 315 ---------
.../api/FlinkLocalCluster.java | 174 -----
.../stormcompatibility/api/FlinkSubmitter.java | 194 ------
.../stormcompatibility/api/FlinkTopology.java | 89 ---
.../api/FlinkTopologyBuilder.java | 407 ------------
.../api/SplitStreamTypeKeySelector.java | 47 --
.../util/FiniteStormSpout.java | 39 --
.../util/FlinkOutputFieldsDeclarer.java | 168 -----
.../util/FlinkStormStreamSelector.java | 48 --
.../util/FlinkTopologyContext.java | 164 -----
.../util/SplitStreamMapper.java | 39 --
.../util/SplitStreamType.java | 52 --
.../util/SplitStreamTypeKeySelector.java | 46 --
.../stormcompatibility/util/StormConfig.java | 123 ----
.../wrappers/AbstractStormCollector.java | 143 ----
.../wrappers/AbstractStormSpoutWrapper.java | 153 -----
.../wrappers/FiniteStormSpoutWrapper.java | 111 ----
.../wrappers/SetupOutputFieldsDeclarer.java | 63 --
.../wrappers/StormBoltCollector.java | 94 ---
.../wrappers/StormBoltWrapper.java | 260 --------
.../wrappers/StormFiniteSpoutWrapper.java | 186 ------
.../wrappers/StormSpoutCollector.java | 82 ---
.../wrappers/StormSpoutWrapper.java | 105 ---
.../stormcompatibility/wrappers/StormTuple.java | 327 ---------
.../wrappers/StormWrapperSetupHelper.java | 268 --------
.../api/FlinkTopologyBuilderTest.java | 76 ---
.../api/FlinkTopologyTest.java | 62 --
.../flink/stormcompatibility/api/TestBolt.java | 48 --
.../flink/stormcompatibility/api/TestSpout.java | 59 --
.../api/TestTopologyBuilder.java | 27 -
.../stormcompatibility/util/AbstractTest.java | 39 --
.../util/FiniteTestSpout.java | 77 ---
.../util/FlinkOutputFieldsDeclarerTest.java | 193 ------
.../util/FlinkStormStreamSelectorTest.java | 51 --
.../util/FlinkTopologyContextTest.java | 114 ----
.../stormcompatibility/util/TestDummyBolt.java | 71 --
.../stormcompatibility/util/TestDummySpout.java | 79 ---
.../flink/stormcompatibility/util/TestSink.java | 60 --
.../wrappers/FiniteStormSpoutWrapperTest.java | 76 ---
.../wrappers/SetupOutputFieldsDeclarerTest.java | 91 ---
.../wrappers/StormBoltCollectorTest.java | 105 ---
.../wrappers/StormBoltWrapperTest.java | 339 ----------
.../wrappers/StormFiniteSpoutWrapperTest.java | 117 ----
.../wrappers/StormSpoutCollectorTest.java | 94 ---
.../wrappers/StormSpoutWrapperTest.java | 120 ----
.../wrappers/StormTupleTest.java | 659 ------------------
.../wrappers/StormWrapperSetupHelperTest.java | 315 ---------
.../wrappers/TestContext.java | 56 --
.../src/test/resources/log4j-test.properties | 27 -
.../src/test/resources/log4j.properties | 27 -
.../src/test/resources/logback-test.xml | 30 -
.../README.md | 20 -
.../flink-storm-compatibility-examples/pom.xml | 362 ----------
.../excamation/ExclamationTopology.java | 128 ----
.../excamation/ExclamationWithStormBolt.java | 143 ----
.../excamation/ExclamationWithStormSpout.java | 150 -----
.../excamation/StormExclamationLocal.java | 75 ---
.../StormExclamationRemoteByClient.java | 83 ---
.../StormExclamationRemoteBySubmitter.java | 81 ---
.../stormoperators/ExclamationBolt.java | 75 ---
.../singlejoin/SingleJoinTopology.java | 90 ---
.../singlejoin/StormSingleJoinLocal.java | 50 --
.../singlejoin/stormoperators/AgeSpout.java | 54 --
.../singlejoin/stormoperators/GenderSpout.java | 47 --
.../stormoperators/SingleJoinBolt.java | 132 ----
.../split/SpoutSplitExample.java | 102 ---
.../split/stormoperators/RandomSpout.java | 76 ---
.../stormoperators/VerifyAndEnrichBolt.java | 61 --
.../util/AbstractStormBoltSink.java | 76 ---
.../util/AbstractStormSpout.java | 70 --
.../util/FiniteStormFileSpout.java | 77 ---
.../util/FiniteStormInMemorySpout.java | 40 --
.../util/OutputFormatter.java | 36 -
.../util/SimpleOutputFormatter.java | 42 --
.../util/StormBoltFileSink.java | 76 ---
.../util/StormBoltPrintSink.java | 45 --
.../stormcompatibility/util/StormFileSpout.java | 88 ---
.../util/StormInMemorySpout.java | 42 --
.../util/TupleOutputFormatter.java | 38 --
.../wordcount/BoltTokenizerWordCount.java | 122 ----
.../wordcount/BoltTokenizerWordCountPojo.java | 135 ----
.../BoltTokenizerWordCountWithNames.java | 138 ----
.../wordcount/SpoutSourceWordCount.java | 157 -----
.../wordcount/StormWordCountLocal.java | 75 ---
.../wordcount/StormWordCountNamedLocal.java | 76 ---
.../wordcount/StormWordCountRemoteByClient.java | 85 ---
.../StormWordCountRemoteBySubmitter.java | 83 ---
.../wordcount/WordCountTopology.java | 135 ----
.../stormoperators/StormBoltCounter.java | 88 ---
.../stormoperators/StormBoltCounterByName.java | 88 ---
.../stormoperators/StormBoltTokenizer.java | 78 ---
.../StormBoltTokenizerByName.java | 78 ---
.../stormoperators/StormWordCountFileSpout.java | 39 --
.../StormWordCountInMemorySpout.java | 40 --
.../stormoperators/WordCountDataPojos.java | 59 --
.../stormoperators/WordCountDataTuple.java | 34 -
.../stormcompatibility/api/StormTestBase.java | 117 ----
.../ExclamationWithStormBoltITCase.java | 49 --
.../ExclamationWithStormSpoutITCase.java | 46 --
.../StormExclamationLocalITCase.java | 48 --
.../exclamation/util/ExclamationData.java | 98 ---
.../split/BoltSplitITCase.java | 28 -
.../stormcompatibility/split/SplitBolt.java | 61 --
.../split/SplitBoltTopology.java | 87 ---
.../split/SplitSpoutTopology.java | 85 ---
.../split/SpoutSplitITCase.java | 28 -
.../split/StormSplitStreamBoltLocal.java | 51 --
.../split/StormSplitStreamSpoutLocal.java | 51 --
.../wordcount/BoltTokenizerWordCountITCase.java | 45 --
.../BoltTokenizerWordCountPojoITCase.java | 45 --
.../BoltTokenizerWordCountWithNamesITCase.java | 45 --
.../wordcount/SpoutSourceWordCountITCase.java | 45 --
.../wordcount/StormWordCountLocalITCase.java | 45 --
.../StormWordCountLocalNamedITCase.java | 45 --
.../src/test/resources/log4j-test.properties | 27 -
.../src/test/resources/log4j.properties | 27 -
.../src/test/resources/logback-test.xml | 30 -
flink-contrib/flink-storm-compatibility/pom.xml | 40 --
flink-contrib/flink-storm-examples/README.md | 20 +
flink-contrib/flink-storm-examples/pom.xml | 364 ++++++++++
.../storm/excamation/ExclamationLocal.java | 75 +++
.../storm/excamation/ExclamationTopology.java | 123 ++++
.../storm/excamation/ExclamationWithBolt.java | 144 ++++
.../storm/excamation/ExclamationWithSpout.java | 150 +++++
.../excamation/operators/ExclamationBolt.java | 75 +++
.../flink/storm/split/SpoutSplitExample.java | 102 +++
.../storm/split/operators/RandomSpout.java | 76 +++
.../split/operators/VerifyAndEnrichBolt.java | 61 ++
.../flink/storm/util/AbstractBoltSink.java | 76 +++
.../flink/storm/util/AbstractLineSpout.java | 70 ++
.../apache/flink/storm/util/BoltFileSink.java | 76 +++
.../apache/flink/storm/util/BoltPrintSink.java | 45 ++
.../org/apache/flink/storm/util/FileSpout.java | 88 +++
.../flink/storm/util/FiniteFileSpout.java | 77 +++
.../flink/storm/util/FiniteInMemorySpout.java | 40 ++
.../apache/flink/storm/util/InMemorySpout.java | 42 ++
.../flink/storm/util/OutputFormatter.java | 37 ++
.../flink/storm/util/SimpleOutputFormatter.java | 41 ++
.../flink/storm/util/TupleOutputFormatter.java | 38 ++
.../storm/wordcount/BoltTokenizerWordCount.java | 122 ++++
.../wordcount/BoltTokenizerWordCountPojo.java | 134 ++++
.../BoltTokenizerWordCountWithNames.java | 137 ++++
.../storm/wordcount/SpoutSourceWordCount.java | 157 +++++
.../flink/storm/wordcount/WordCountLocal.java | 76 +++
.../storm/wordcount/WordCountLocalByName.java | 77 +++
.../wordcount/WordCountRemoteByClient.java | 86 +++
.../wordcount/WordCountRemoteBySubmitter.java | 84 +++
.../storm/wordcount/WordCountTopology.java | 136 ++++
.../storm/wordcount/operators/BoltCounter.java | 90 +++
.../wordcount/operators/BoltCounterByName.java | 90 +++
.../wordcount/operators/BoltTokenizer.java | 78 +++
.../operators/BoltTokenizerByName.java | 78 +++
.../wordcount/operators/WordCountDataPojos.java | 59 ++
.../wordcount/operators/WordCountDataTuple.java | 34 +
.../wordcount/operators/WordCountFileSpout.java | 39 ++
.../operators/WordCountInMemorySpout.java | 40 ++
.../exclamation/ExclamationWithBoltITCase.java | 49 ++
.../exclamation/ExclamationWithSpoutITCase.java | 46 ++
.../StormExclamationLocalITCase.java | 48 ++
.../storm/exclamation/util/ExclamationData.java | 98 +++
.../flink/storm/split/BoltSplitITCase.java | 28 +
.../org/apache/flink/storm/split/SplitBolt.java | 61 ++
.../flink/storm/split/SplitBoltTopology.java | 87 +++
.../flink/storm/split/SplitSpoutTopology.java | 85 +++
.../flink/storm/split/SplitStreamBoltLocal.java | 51 ++
.../storm/split/SplitStreamSpoutLocal.java | 51 ++
.../flink/storm/split/SpoutSplitITCase.java | 28 +
.../apache/flink/storm/util/StormTestBase.java | 117 ++++
.../wordcount/BoltTokenizerWordCountITCase.java | 46 ++
.../BoltTokenizerWordCountPojoITCase.java | 46 ++
.../BoltTokenizerWordCountWithNamesITCase.java | 46 ++
.../wordcount/SpoutSourceWordCountITCase.java | 46 ++
.../storm/wordcount/WordCountLocalITCase.java | 46 ++
.../wordcount/WordCountLocalNamedITCase.java | 46 ++
.../src/test/resources/log4j-test.properties | 27 +
.../src/test/resources/log4j.properties | 27 +
.../src/test/resources/logback-test.xml | 30 +
flink-contrib/flink-storm/README.md | 15 +
flink-contrib/flink-storm/pom.xml | 114 ++++
.../org/apache/flink/storm/api/FlinkClient.java | 315 +++++++++
.../flink/storm/api/FlinkLocalCluster.java | 173 +++++
.../storm/api/FlinkOutputFieldsDeclarer.java | 168 +++++
.../apache/flink/storm/api/FlinkSubmitter.java | 194 ++++++
.../apache/flink/storm/api/FlinkTopology.java | 89 +++
.../flink/storm/api/FlinkTopologyBuilder.java | 397 +++++++++++
.../apache/flink/storm/util/FiniteSpout.java | 36 +
.../flink/storm/util/SplitStreamMapper.java | 39 ++
.../flink/storm/util/SplitStreamType.java | 52 ++
.../storm/util/SplitStreamTypeKeySelector.java | 46 ++
.../apache/flink/storm/util/StormConfig.java | 123 ++++
.../flink/storm/util/StormStreamSelector.java | 48 ++
.../storm/wrappers/AbstractStormCollector.java | 143 ++++
.../flink/storm/wrappers/BoltCollector.java | 89 +++
.../flink/storm/wrappers/BoltWrapper.java | 260 ++++++++
.../storm/wrappers/FlinkTopologyContext.java | 165 +++++
.../wrappers/SetupOutputFieldsDeclarer.java | 66 ++
.../flink/storm/wrappers/SpoutCollector.java | 81 +++
.../flink/storm/wrappers/SpoutWrapper.java | 274 ++++++++
.../apache/flink/storm/wrappers/StormTuple.java | 327 +++++++++
.../storm/wrappers/WrapperSetupHelper.java | 266 ++++++++
.../api/FlinkOutputFieldsDeclarerTest.java | 194 ++++++
.../storm/api/FlinkTopologyBuilderTest.java | 77 +++
.../flink/storm/api/FlinkTopologyTest.java | 63 ++
.../org/apache/flink/storm/api/TestBolt.java | 48 ++
.../org/apache/flink/storm/api/TestSpout.java | 59 ++
.../flink/storm/api/TestTopologyBuilder.java | 29 +
.../apache/flink/storm/util/AbstractTest.java | 39 ++
.../flink/storm/util/FiniteTestSpout.java | 77 +++
.../storm/util/StormStreamSelectorTest.java | 51 ++
.../apache/flink/storm/util/TestDummyBolt.java | 74 +++
.../apache/flink/storm/util/TestDummySpout.java | 82 +++
.../org/apache/flink/storm/util/TestSink.java | 60 ++
.../flink/storm/wrappers/BoltCollectorTest.java | 87 +++
.../flink/storm/wrappers/BoltWrapperTest.java | 355 ++++++++++
.../wrappers/FlinkTopologyContextTest.java | 115 ++++
.../wrappers/SetupOutputFieldsDeclarerTest.java | 92 +++
.../storm/wrappers/SpoutCollectorTest.java | 88 +++
.../flink/storm/wrappers/SpoutWrapperTest.java | 220 +++++++
.../flink/storm/wrappers/StormTupleTest.java | 660 +++++++++++++++++++
.../flink/storm/wrappers/TestContext.java | 56 ++
.../storm/wrappers/WrapperSetupHelperTest.java | 317 +++++++++
.../src/test/resources/log4j-test.properties | 27 +
.../src/test/resources/log4j.properties | 27 +
.../src/test/resources/logback-test.xml | 30 +
flink-contrib/pom.xml | 3 +-
228 files changed, 10902 insertions(+), 11912 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
index d676db8..bf80d4e 100644
--- a/docs/apis/storm_compatibility.md
+++ b/docs/apis/storm_compatibility.md
@@ -36,33 +36,34 @@ This document shows how to use existing Storm code with Flink.
# Project Configuration
-Support for Storm is contained in the `flink-storm-compatibility-core` Maven module.
-The code resides in the `org.apache.flink.stormcompatibility` package.
+Support for Storm is contained in the `flink-storm` Maven module.
+The code resides in the `org.apache.flink.storm` package.
Add the following dependency to your `pom.xml` if you want to execute Storm code in Flink.
~~~xml
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-storm-compatibility-core</artifactId>
+ <artifactId>flink-storm</artifactId>
<version>{{site.version}}</version>
</dependency>
~~~
-**Please note**: `flink-storm-compatibility-core` is not part of the provided binary Flink distribution.
-Thus, you need to include `flink-storm-compatiblitly-core` classes (and their dependencies) in your program jar that is submitted to Flink's JobManager.
-See *WordCount Storm* within `flink-storm-compatibility-example/pom.xml` for an example how to package a jar correctly.
+**Please note**: `flink-storm` is not part of the provided binary Flink distribution.
+Thus, you need to include `flink-storm` classes (and their dependencies) in your program jar that is submitted to Flink's JobManager.
+See *WordCount Storm* within `flink-storm-examples/pom.xml` for an example how to package a jar correctly.
# Execute Storm Topologies
-Flink provides a Storm compatible API (`org.apache.flink.stormcompatibility.api`) that offers replacements for the following classes:
+Flink provides a Storm compatible API (`org.apache.flink.storm.api`) that offers replacements for the following classes:
- `TopologyBuilder` replaced by `FlinkTopologyBuilder`
- `StormSubmitter` replaced by `FlinkSubmitter`
- `NimbusClient` and `Client` replaced by `FlinkClient`
- `LocalCluster` replaced by `FlinkLocalCluster`
-In order to submit a Storm topology to Flink, it is sufficient to replace the used Storm classed with their Flink replacements in the original Storm client code that assembles the topology.
+In order to submit a Storm topology to Flink, it is sufficient to replace the used Storm classes with their Flink replacements in the Storm *client code that assembles* the topology.
+The actual runtime code, ie, Spouts and Bolts, can be uses *unmodified*.
If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively.
If a parameter is not specified, the value is taken from `flink-conf.yaml`.
@@ -71,10 +72,11 @@ If a parameter is not specified, the value is taken from `flink-conf.yaml`.
~~~java
FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); // replaces: TopologyBuilder builder = new FlinkTopology();
-builder.setSpout("source", new StormFileSpout(inputFilePath));
-builder.setBolt("tokenizer", new StormBoltTokenizer()).shuffleGrouping("source");
-builder.setBolt("counter", new StormBoltCounter()).fieldsGrouping("tokenizer", new Fields("word"));
-builder.setBolt("sink", new StormBoltFileSink(outputFilePath)).shuffleGrouping("counter");
+// actual topology assembling code and used Spouts/Bolts can be used as-is
+builder.setSpout("source", new FileSpout(inputFilePath));
+builder.setBolt("tokenizer", new BoltTokenizer()).shuffleGrouping("source");
+builder.setBolt("counter", new BoltCounter()).fieldsGrouping("tokenizer", new Fields("word"));
+builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("counter");
Config conf = new Config();
if(runLocal) { // submit to test cluster
@@ -93,7 +95,7 @@ if(runLocal) { // submit to test cluster
# Embed Storm Operators in Flink Streaming Programs
As an alternative, Spouts and Bolts can be embedded into regular streaming programs.
-The Storm compatibility layer offers a wrapper classes for each, namely `StormSpoutWrapper` and `StormBoltWrapper` (`org.apache.flink.stormcompatibility.wrappers`).
+The Storm compatibility layer offers a wrapper classes for each, namely `SpoutWrapper` and `BoltWrapper` (`org.apache.flink.storm.wrappers`).
Per default, both wrappers convert Storm output tuples to Flink's [Tuple](programming_guide.html#tuples-and-case-classes) types (ie, `Tuple0` to `Tuple25` according to the number of fields of the Storm tuples).
For single field output tuples a conversion to the field's data type is also possible (eg, `String` instead of `Tuple1<String>`).
@@ -104,7 +106,7 @@ In order to get the correct `TypeInformation` object, Flink's `TypeExtractor` ca
## Embed Spouts
In order to use a Spout as Flink source, use `StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation)`.
-The Spout object is handed to the constructor of `StormSpoutWrapper<OUT>` that serves as first argument to `addSource(...)`.
+The Spout object is handed to the constructor of `SpoutWrapper<OUT>` that serves as first argument to `addSource(...)`.
The generic type declaration `OUT` specifies the type of the source output stream.
<div class="codetabs" markdown="1">
@@ -114,7 +116,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
// stream has `raw` type (single field output streams only)
DataStream<String> rawInput = env.addSource(
- new StormSpoutWrapper<String>(new StormFileSpout(localFilePath), new String[] { Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
+ new SpoutWrapper<String>(new FileSpout(localFilePath), new String[] { Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
TypeExtractor.getForClass(String.class)); // output type
// process data stream
@@ -123,15 +125,15 @@ DataStream<String> rawInput = env.addSource(
</div>
</div>
-If a Spout emits a finite number of tuples, `StormFiniteSpoutWrapper` can be used instead of `StormSpoutWrapper`.
-Using `StormFiniteSpoutWrapper` allows the Flink program to shut down automatically after all data is processed.
-If `StormSpoutWrapper` is used, the program will run until it is [canceled](cli.html) manually.
+If a Spout emits a finite number of tuples, `SpoutWrapper` can be configures to terminate automatically by setting `numberOfInvocations` parameter in its constructor.
+This allows the Flink program to shut down automatically after all data is processed.
+Per default the program will run until it is [canceled](cli.html) manually.
## Embed Bolts
In order to use a Bolt as Flink operator, use `DataStream.transform(String, TypeInformation, OneInputStreamOperator)`.
-The Bolt object is handed to the constructor of `StormBoltWrapper<IN,OUT>` that serves as last argument to `transform(...)`.
+The Bolt object is handed to the constructor of `BoltWrapper<IN,OUT>` that serves as last argument to `transform(...)`.
The generic type declarations `IN` and `OUT` specify the type of the operator's input and output stream, respectively.
<div class="codetabs" markdown="1">
@@ -143,7 +145,7 @@ DataStream<String> text = env.readTextFile(localFilePath);
DataStream<Tuple2<String, Integer>> counts = text.transform(
"tokenizer", // operator name
TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), // output type
- new StormBoltWrapper<String, Tuple2<String, Integer>>(new StormBoltTokenizer())); // Bolt operator
+ new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer())); // Bolt operator
// do further processing
[...]
@@ -164,16 +166,16 @@ For this case, Flink expects either a corresponding public member variable or pu
For example, if a Bolt accesses a field via name `sentence` (eg, `String s = input.getStringByField("sentence");`), the input POJO class must have a member variable `public String sentence;` or method `public String getSentence() { ... };` (pay attention to camel-case naming).
For `Tuple` input types, it is required to specify the input schema using Storm's `Fields` class.
-For this case, the constructor of `StormBoltWrapper` takes an additional argument: `new StormBoltWrapper<Tuple1<String>, Tuple2<String, Integer>>(new StormBoltTokenizerByName(), new Fields("sentence"))`.
+For this case, the constructor of `BoltWrapper` takes an additional argument: `new BoltWrapper<Tuple1<String>, ...>(..., new Fields("sentence"))`.
The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.getStringByField("sentence")` is equivalent to `input.getString(0)`.
-See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.
+See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java) for examples.
## Configuring Spouts and Bolts
In Storm, Spouts and Bolts can be configured with a globally distributed `Map` object that is given to `submitTopology(...)` method of `LocalCluster` or `StormSubmitter`.
This `Map` is provided by the user next to the topology and gets forwarded as a parameter to the calls `Spout.open(...)` and `Bolt.prepare(...)`.
-If a whole topology is executed using `FlinkTopologyBuilder` etc., there is no special attention required – it works as in regular Storm.
+If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., there is no special attention required – it works as in regular Storm.
For embedded usage, Flink's configuration mechanism must be used.
A global configuration can be set in a `StreamExecutionEnvironment` via `.getConfig().setGlobalJobParameters(...)`.
@@ -202,12 +204,12 @@ env.getConfig().setGlobalJobParameters(config);
## Multiple Output Streams
Flink can also handle the declaration of multiple output streams for Spouts and Bolts.
-If a whole topology is executed using `FlinkTopologyBuilder` etc., there is no special attention required – it works as in regular Storm.
+If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., there is no special attention required – it works as in regular Storm.
-For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitDataStream.select(...)`.
-Flink provides the predefined output selector `FlinkStormStreamSelector<T>` for `.split(...)` already.
+For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitStream.select(...)`.
+Flink provides the predefined output selector `StormStreamSelector<T>` for `.split(...)` already.
Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using `SplitStreamMapper<T>`.
-If a data stream of type `SplitStreamTuple<T>` is used as input for a Bolt, `SplitStreamTuple<T>` must **not** be removed – `StormBoltWrapper` removes it automatically.
+If a data stream of type `SplitStreamTuple<T>` is used as input for a Bolt, it is **not** required to strip the wrapper – `BoltWrapper` removes it automatically.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -217,11 +219,11 @@ If a data stream of type `SplitStreamTuple<T>` is used as input for a Bolt, `Spl
// get DataStream from Spout or Bolt which declares two output streams s1 and s2 with output type SomeType
DataStream<SplitStreamType<SomeType>> multiStream = ...
-SplitDataStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new FlinkStormStreamSelector<SomeType>());
+SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new StormStreamSelector<SomeType>());
// remove SplitStreamMapper to get data stream of type SomeType
DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>).returns(SomeType.classs);
-// apply Bolt directly, without stripping SplitStreamMapper
+// apply Bolt directly, without stripping SplitStreamType
DataStream<BoltOutputType> s2 = splitStream.select("s2").transform(/* use Bolt for further processing */);
// do further processing on s1 and s2
@@ -230,67 +232,48 @@ DataStream<BoltOutputType> s2 = splitStream.select("s2").transform(/* use Bolt f
</div>
</div>
-See [SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java) for a full example.
+See [SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java) for a full example.
# Flink Extensions
-## Finite Storm Spouts
+## Finite Spouts
-In Flink streaming, sources can be finite – i.e., emit a finite number of records and stop after emitting the last record –, however, Storm spouts always emit infinite streams.
-The bridge between the two approach is the `FiniteStormSpout` interface which, in addition to `IRichSpout`, contains a `reachedEnd()` method, where the user can specify a stopping-condition.
-The user can create a finite Storm spout by implementing this interface instead of `IRichSpout`, and implementing the `reachedEnd()`method in addition.
-When used as part of a Flink topology, a `FiniteStormSpout` should be wrapped by `FiniteStormSpoutWrapper`.
+In Flink, streaming sources can be finite, ie, emit a finite number of records and stop after emitting the last record. However, Spouts usually emit infinite streams.
+The bridge between the two approaches is the `FiniteSpout` interface which, in addition to `IRichSpout`, contains a `reachedEnd()` method, where the user can specify a stopping-condition.
+The user can create a finite Spout by implementing this interface instead of (or additionally to) `IRichSpout`, and implementing the `reachedEnd()` method in addition.
+In contrast to a `SpoutWrapper` that is configured to emit a finite number of tuples, `FiniteSpout` interface allows to implement more complex termination criteria.
-Although finite Storm spouts are not necessary to embed Storm spouts into a Flink streaming program or to submit a whole Storm topology to Flink, there are cases where they may come in handy:
+Although finite Spouts are not necessary to embed Spouts into a Flink streaming program or to submit a whole Storm topology to Flink, there are cases where they may come in handy:
- * to achieve that a native Storm spout behaves the same way as a finite Flink source with minimal modifications
- * the user wants to process a stream only for some time; after that, the spout can stop automatically
+ * to achieve that a native Spout behaves the same way as a finite Flink source with minimal modifications
+ * the user wants to process a stream only for some time; after that, the Spout can stop automatically
* reading a file into a stream
* for testing purposes
-A `FiniteStormSpout` can be still used as a normal, infinite Storm spout by changing its wrapper class to `StormSpoutWraper` in the Flink topology.
-
-An example of a finite Storm spout that emits records for 10 seconds only:
+An example of a finite Spout that emits records for 10 seconds only:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
~~~java
-public class TimedFiniteStormSpout extends AbstractStormSpout implements FiniteStormSpout {
- [...]
+public class TimedFiniteSpout extends BaseRichSpout implements FiniteSpout {
+ [...] // implemente open(), nextTuple(), ...
+
private long starttime = System.currentTimeMillis();
public boolean reachedEnd() {
return System.currentTimeMillis() - starttime > 10000l;
}
- [...]
}
~~~
</div>
</div>
-Using a `FiniteStormSpout` in a Flink topology:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-~~~java
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-DataStream<String> rawInput = env.addSource(
- new FiniteStormSpoutWrapper<String>(new TimedFiniteStormSpout(), true)
- TypeExtractor.getForClass(String.class));
-
-// process data stream
-[...]
-~~~
-</div>
-</div>
-
# Storm Compatibility Examples
-You can find more examples in Maven module `flink-storm-compatibilty-examples`.
-For the different versions of WordCount, see [README.md](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md).
+You can find more examples in Maven module `flink-storm-examples`.
+For the different versions of WordCount, see [README.md](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/README.md).
To run the examples, you need to assemble a correct jar file.
-`flink-storm-compatibility-examples-0.10-SNAPSHOT.jar` is **no** valid jar file for job execution (it is only a standard maven artifact).
+`flink-storm-examples-0.10-SNAPSHOT.jar` is **no** valid jar file for job execution (it is only a standard maven artifact).
There are example jars for embedded Spout and Bolt, namely `WordCount-SpoutSource.jar` and `WordCount-BoltTokenizer.jar`, respectively.
Compare `pom.xml` to see how both jars are built.
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
deleted file mode 100644
index 9663fc7..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
+++ /dev/null
@@ -1,12 +0,0 @@
-# flink-storm-compatibility
-
-The Storm compatibility layer allows to embed spouts or bolt unmodified within a regular Flink streaming program (`StormSpoutWrapper` and `StormBoltWrapper`). Additionally, a whole Storm topology can be submitted to Flink (see `FlinkTopologyBuilder`, `FlinkLocalCluster`, and `FlinkSubmitter`). Only a few minor changes to the original submitting code are required. The code that builds the topology itself, can be reused unmodified. See `flink-storm-examples` for a simple word-count example.
-
-The following Strom features are not (yet/fully) supported by the compatibility layer right now:
-* tuple meta information
-* no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
-* for whole Storm topologies the following is not supported by Flink:
- * direct emit connection pattern
- * activating/deactivating and rebalancing of topologies
- * task hooks
- * custom metrics
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/pom.xml b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/pom.xml
deleted file mode 100644
index cced678..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/pom.xml
+++ /dev/null
@@ -1,114 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-storm-compatibility-parent</artifactId>
- <version>0.10-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-storm-compatibility-core</artifactId>
- <name>flink-storm-compatibility-core</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>0.9.4</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>logback-classic</artifactId>
- <groupId>ch.qos.logback</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
-
- <pluginManagement>
- <plugins>
- <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <versionRange>[2.9,)</versionRange>
- <goals>
- <goal>unpack</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
-
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
deleted file mode 100644
index 4676102..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import backtype.storm.Config;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-import com.google.common.collect.Lists;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-
-import scala.Some;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Map;
-
-/**
- * {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link Nimbus}{@code .Client} at once, to interact with
- * Flink's JobManager instead of Storm's Nimbus.
- */
-public class FlinkClient {
-
- /** The client's configuration */
- private final Map<?,?> conf;
- /** The jobmanager's host name */
- private final String jobManagerHost;
- /** The jobmanager's rpc port */
- private final int jobManagerPort;
- /** The user specified timeout in milliseconds */
- private final String timeout;
-
- // The following methods are derived from "backtype.storm.utils.NimbusClient"
-
- /**
- * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
- * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
- *
- * @param conf
- * A configuration.
- * @param host
- * The jobmanager's host name.
- * @param port
- * The jobmanager's rpc port.
- */
- @SuppressWarnings("rawtypes")
- public FlinkClient(final Map conf, final String host, final int port) {
- this(conf, host, port, null);
- }
-
- /**
- * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
- * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
- *
- * @param conf
- * A configuration.
- * @param host
- * The jobmanager's host name.
- * @param port
- * The jobmanager's rpc port.
- * @param timeout
- * Timeout
- */
- @SuppressWarnings("rawtypes")
- public FlinkClient(final Map conf, final String host, final int port, final Integer timeout) {
- this.conf = conf;
- this.jobManagerHost = host;
- this.jobManagerPort = port;
- if (timeout != null) {
- this.timeout = timeout + " ms";
- } else {
- this.timeout = null;
- }
- }
-
- /**
- * Returns a {@link FlinkClient} that uses the configured {@link Config#NIMBUS_HOST} and {@link
- * Config#NIMBUS_THRIFT_PORT} as JobManager address.
- *
- * @param conf
- * Configuration that contains the jobmanager's hostname and port.
- * @return A configured {@link FlinkClient}.
- */
- @SuppressWarnings("rawtypes")
- public static FlinkClient getConfiguredClient(final Map conf) {
- final String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
- final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
- return new FlinkClient(conf, nimbusHost, nimbusPort);
- }
-
- /**
- * Return a reference to itself.
- * <p/>
- * {@link FlinkClient} mimics both, {@link NimbusClient} and {@link Nimbus}{@code .Client}, at once.
- *
- * @return A reference to itself.
- */
- public FlinkClient getClient() {
- return this;
- }
-
- // The following methods are derived from "backtype.storm.generated.Nimubs.Client"
-
- /**
- * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
- * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
- */
- public void submitTopology(final String name, final String uploadedJarLocation, final FlinkTopology topology)
- throws AlreadyAliveException, InvalidTopologyException {
- this.submitTopologyWithOpts(name, uploadedJarLocation, topology);
- }
-
- /**
- * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
- * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
- */
- public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology
- topology)
- throws AlreadyAliveException, InvalidTopologyException {
-
- if (this.getTopologyJobId(name) != null) {
- throw new AlreadyAliveException();
- }
-
- final File uploadedJarFile = new File(uploadedJarLocation);
- try {
- JobWithJars.checkJarFile(uploadedJarFile);
- } catch (final IOException e) {
- throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
- }
-
- /* set storm configuration */
- if (this.conf != null) {
- topology.getConfig().setGlobalJobParameters(new StormConfig(this.conf));
- }
-
- final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
- jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
-
- final Configuration configuration = jobGraph.getJobConfiguration();
- configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
- configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
-
- final Client client;
- try {
- client = new Client(configuration);
- } catch (IOException e) {
- throw new RuntimeException("Could not establish a connection to the job manager", e);
- }
-
- try {
- ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader(
- Lists.newArrayList(uploadedJarFile),
- this.getClass().getClassLoader());
- client.runDetached(jobGraph, classLoader);
- } catch (final ProgramInvocationException e) {
- throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
- }
- }
-
- public void killTopology(final String name) throws NotAliveException {
- this.killTopologyWithOpts(name, null);
- }
-
- public void killTopologyWithOpts(final String name, final KillOptions options) throws NotAliveException {
- final JobID jobId = this.getTopologyJobId(name);
- if (jobId == null) {
- throw new NotAliveException();
- }
-
- try {
- final ActorRef jobManager = this.getJobManager();
-
- if (options != null) {
- try {
- Thread.sleep(1000 * options.get_wait_secs());
- } catch (final InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- final FiniteDuration askTimeout = this.getTimeout();
- final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
- try {
- Await.result(response, askTimeout);
- } catch (final Exception e) {
- throw new RuntimeException("Killing topology " + name + " with Flink job ID " + jobId + " failed", e);
- }
- } catch (final IOException e) {
- throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
- + ":" + this.jobManagerPort, e);
- }
- }
-
- // Flink specific additional methods
-
- /**
- * Package internal method to get a Flink {@link JobID} from a Storm topology name.
- *
- * @param id
- * The Storm topology name.
- * @return Flink's internally used {@link JobID}.
- */
- JobID getTopologyJobId(final String id) {
- final Configuration configuration = GlobalConfiguration.getConfiguration();
- if (this.timeout != null) {
- configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
- }
-
- try {
- final ActorRef jobManager = this.getJobManager();
-
- final FiniteDuration askTimeout = this.getTimeout();
- final Future<Object> response = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus(),
- new Timeout(askTimeout));
-
- Object result;
- try {
- result = Await.result(response, askTimeout);
- } catch (final Exception e) {
- throw new RuntimeException("Could not retrieve running jobs from the JobManager", e);
- }
-
- if (result instanceof RunningJobsStatus) {
- final List<JobStatusMessage> jobs = ((RunningJobsStatus) result).getStatusMessages();
-
- for (final JobStatusMessage status : jobs) {
- if (status.getJobName().equals(id)) {
- return status.getJobId();
- }
- }
- } else {
- throw new RuntimeException("ReqeustRunningJobs requires a response of type "
- + "RunningJobs. Instead the response is of type " + result.getClass() + ".");
- }
- } catch (final IOException e) {
- throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
- + ":" + this.jobManagerPort, e);
- }
-
- return null;
- }
-
- private FiniteDuration getTimeout() {
- final Configuration configuration = GlobalConfiguration.getConfiguration();
- if (this.timeout != null) {
- configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
- }
-
- return AkkaUtils.getTimeout(configuration);
- }
-
- private ActorRef getJobManager() throws IOException {
- final Configuration configuration = GlobalConfiguration.getConfiguration();
-
- ActorSystem actorSystem;
- try {
- final scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
- actorSystem = AkkaUtils.createActorSystem(configuration, new Some<scala.Tuple2<String, Object>>(
- systemEndpoint));
- } catch (final Exception e) {
- throw new RuntimeException("Could not start actor system to communicate with JobManager", e);
- }
-
- return JobManager.getJobManagerActorRef(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
- actorSystem, AkkaUtils.getLookupTimeout(configuration));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
deleted file mode 100644
index 9b3fb54..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.RebalanceOptions;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologyInfo;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
- */
-public class FlinkLocalCluster {
-
- /** The log used by this mini cluster */
- private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);
-
- /** The flink mini cluster on which to execute the programs */
- private final FlinkMiniCluster flink;
-
-
- public FlinkLocalCluster() {
- this.flink = new LocalFlinkMiniCluster(new Configuration(), true, StreamingMode.STREAMING);
- this.flink.start();
- }
-
- public FlinkLocalCluster(FlinkMiniCluster flink) {
- this.flink = Objects.requireNonNull(flink);
- }
-
- @SuppressWarnings("rawtypes")
- public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology)
- throws Exception {
- this.submitTopologyWithOpts(topologyName, conf, topology, null);
- }
-
- @SuppressWarnings("rawtypes")
- public void submitTopologyWithOpts(final String topologyName, final Map conf, final FlinkTopology topology, final SubmitOptions submitOpts) throws Exception {
- LOG.info("Running Storm topology on FlinkLocalCluster");
-
- if(conf != null) {
- topology.getConfig().setGlobalJobParameters(new StormConfig(conf));
- }
-
- JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
- this.flink.submitJobDetached(jobGraph);
- }
-
- public void killTopology(final String topologyName) {
- this.killTopologyWithOpts(topologyName, null);
- }
-
- public void killTopologyWithOpts(final String name, final KillOptions options) {
- }
-
- public void activate(final String topologyName) {
- }
-
- public void deactivate(final String topologyName) {
- }
-
- public void rebalance(final String name, final RebalanceOptions options) {
- }
-
- public void shutdown() {
- flink.stop();
- }
-
- public String getTopologyConf(final String id) {
- return null;
- }
-
- public StormTopology getTopology(final String id) {
- return null;
- }
-
- public ClusterSummary getClusterInfo() {
- return null;
- }
-
- public TopologyInfo getTopologyInfo(final String id) {
- return null;
- }
-
- public Map<?, ?> getState() {
- return null;
- }
-
- // ------------------------------------------------------------------------
- // Access to default local cluster
- // ------------------------------------------------------------------------
-
- // A different {@link FlinkLocalCluster} to be used for execution of ITCases
- private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();
-
- /**
- * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
- * {@link #initialize(LocalClusterFactory)} in advance, a new {@link FlinkLocalCluster} is returned.
- *
- * @return a {@link FlinkLocalCluster} to be used for execution
- */
- public static FlinkLocalCluster getLocalCluster() {
- return currentFactory.createLocalCluster();
- }
-
- /**
- * Sets a different factory for FlinkLocalClusters to be used for execution.
- *
- * @param clusterFactory
- * The LocalClusterFactory to create the local clusters for execution.
- */
- public static void initialize(LocalClusterFactory clusterFactory) {
- currentFactory = Objects.requireNonNull(clusterFactory);
- }
-
- // ------------------------------------------------------------------------
- // Cluster factory
- // ------------------------------------------------------------------------
-
- /**
- * A factory that creates local clusters.
- */
- public static interface LocalClusterFactory {
-
- /**
- * Creates a local flink cluster.
- * @return A local flink cluster.
- */
- FlinkLocalCluster createLocalCluster();
- }
-
- /**
- * A factory that instantiates a FlinkLocalCluster.
- */
- public static class DefaultLocalClusterFactory implements LocalClusterFactory {
-
- @Override
- public FlinkLocalCluster createLocalCluster() {
- return new FlinkLocalCluster();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
deleted file mode 100644
index 5f3f31e..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Map;
-
-/**
- * {@link FlinkSubmitter} mimics a {@link StormSubmitter} to submit Storm topologies to a Flink cluster.
- */
-public class FlinkSubmitter {
- public final static Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class);
-
- /**
- * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
- *
- * @param name
- * the name of the storm.
- * @param stormConf
- * the topology-specific configuration. See {@link Config}.
- * @param topology
- * the processing to execute.
- * @param opts
- * to manipulate the starting of the topology.
- * @throws AlreadyAliveException
- * if a topology with this name is already running
- * @throws InvalidTopologyException
- * if an invalid topology was submitted
- */
- public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology,
- final SubmitOptions opts)
- throws AlreadyAliveException, InvalidTopologyException {
- submitTopology(name, stormConf, topology);
- }
-
- /**
- * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. The given {@link
- * FlinkProgressListener} is ignored because progress bars are not supported by Flink.
- *
- * @param name
- * the name of the storm.
- * @param stormConf
- * the topology-specific configuration. See {@link Config}.
- * @param topology
- * the processing to execute.
- * @throws AlreadyAliveException
- * if a topology with this name is already running
- * @throws InvalidTopologyException
- * if an invalid topology was submitted
- */
- @SuppressWarnings({"rawtypes", "unchecked"})
- public static void submitTopology(final String name, final Map stormConf, final FlinkTopology topology)
- throws AlreadyAliveException, InvalidTopologyException {
- if (!Utils.isValidConf(stormConf)) {
- throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
- }
-
- final Configuration flinkConfig = GlobalConfiguration.getConfiguration();
- if (!stormConf.containsKey(Config.NIMBUS_HOST)) {
- stormConf.put(Config.NIMBUS_HOST,
- flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));
- }
- if (!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
- stormConf.put(Config.NIMBUS_THRIFT_PORT,
- new Integer(flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- 6123)));
- }
-
- final String serConf = JSONValue.toJSONString(stormConf);
-
- final FlinkClient client = FlinkClient.getConfiguredClient(stormConf);
- try {
- if (client.getTopologyJobId(name) != null) {
- throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
- }
- String localJar = System.getProperty("storm.jar");
- if (localJar == null) {
- try {
- for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
- .getJars()) {
- // TODO verify that there is only one jar
- localJar = file.getAbsolutePath();
- }
- } catch (final ClassCastException e) {
- // ignore
- }
- }
-
- logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
- client.submitTopologyWithOpts(name, localJar, topology);
- } catch (final InvalidTopologyException e) {
- logger.warn("Topology submission exception: " + e.get_msg());
- throw e;
- } catch (final AlreadyAliveException e) {
- logger.warn("Topology already alive exception", e);
- throw e;
- }
-
- logger.info("Finished submitting topology: " + name);
- }
-
- /**
- * Same as {@link #submitTopology(String, Map, FlinkTopology, SubmitOptions)}. Progress bars are not supported by
- * Flink.
- *
- * @param name
- * the name of the storm.
- * @param stormConf
- * the topology-specific configuration. See {@link Config}.
- * @param topology
- * the processing to execute.
- * @throws AlreadyAliveException
- * if a topology with this name is already running
- * @throws InvalidTopologyException
- * if an invalid topology was submitted
- */
- public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf,
- final FlinkTopology topology)
- throws AlreadyAliveException, InvalidTopologyException {
- submitTopology(name, stormConf, topology);
- }
-
- /**
- * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
- * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
- * environment.
- *
- * @param conf
- * the topology-specific configuration. See {@link Config}.
- * @param localJar
- * file path of the jar file to submit
- * @return the value of parameter localJar
- */
- @SuppressWarnings("rawtypes")
- public static String submitJar(final Map conf, final String localJar) {
- return submitJar(localJar);
- }
-
- /**
- * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
- * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
- * environment.
- *
- * @param localJar
- * file path of the jar file to submit
- * @return the value of parameter localJar
- */
- public static String submitJar(final String localJar) {
- if (localJar == null) {
- throw new RuntimeException(
- "Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar " +
- "to upload");
- }
-
- return localJar;
- }
-
- /**
- * Dummy interface use to track progress of file upload. Does not do anything. Kept for compatibility.
- */
- public interface FlinkProgressListener {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
deleted file mode 100644
index 179466e..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.generated.StormTopology;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link
- * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology}
- * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or
- * {@link FlinkClient}.
- */
-public class FlinkTopology extends StreamExecutionEnvironment {
-
- /** The number of declared tasks for the whole program (ie, sum over all dops) */
- private int numberOfTasks = 0;
-
- public FlinkTopology() {
- // Set default parallelism to 1, to mirror Storm default behavior
- super.setParallelism(1);
- }
-
- /**
- * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link
- * FlinkClient}.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @Override
- public JobExecutionResult execute() throws Exception {
- throw new UnsupportedOperationException(
- "A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
- "instead.");
- }
-
- /**
- * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter} or {@link
- * FlinkClient}.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @Override
- public JobExecutionResult execute(final String jobName) throws Exception {
- throw new UnsupportedOperationException(
- "A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
- "instead.");
- }
-
- /**
- * Increased the number of declared tasks of this program by the given value.
- *
- * @param dop
- * The dop of a new operator that increases the number of overall tasks.
- */
- public void increaseNumberOfTasks(final int dop) {
- assert (dop > 0);
- this.numberOfTasks += dop;
- }
-
- /**
- * Return the number or required tasks to execute this program.
- *
- * @return the number or required tasks to execute this program
- */
- public int getNumberOfTasks() {
- return this.numberOfTasks;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
deleted file mode 100644
index d62d56b..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BasicBoltExecutor;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.IRichStateSpout;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
-import org.apache.flink.stormcompatibility.util.FlinkOutputFieldsDeclarer;
-import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.apache.flink.stormcompatibility.util.SplitStreamTypeKeySelector;
-import org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper;
-import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
-import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
-import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-
-/**
- * {@link FlinkTopologyBuilder} mimics a {@link TopologyBuilder}, but builds a Flink program instead of a Storm
- * topology. Most methods (except {@link #createTopology()} are copied from the original {@link TopologyBuilder}
- * implementation to ensure equal behavior.<br />
- * <br />
- * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported.</strong>
- */
-public class FlinkTopologyBuilder {
-
- /** A Storm {@link TopologyBuilder} to build a real Storm topology */
- private final TopologyBuilder stormBuilder = new TopologyBuilder();
- /** All user spouts by their ID */
- private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>();
- /** All user bolts by their ID */
- private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
- /** All declared streams and output schemas by operator ID */
- private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<String, HashMap<String, Fields>>();
- /** All spouts&bolts declarers by their ID */
- private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>();
- // needs to be a class member for internal testing purpose
- private StormTopology stormTopology;
-
-
- /**
- * Creates a Flink program that uses the specified spouts and bolts.
- */
- @SuppressWarnings({"rawtypes", "unchecked"})
- public FlinkTopology createTopology() {
- this.stormTopology = this.stormBuilder.createTopology();
-
- final FlinkTopology env = new FlinkTopology();
- env.setParallelism(1);
-
- final HashMap<String, HashMap<String, DataStream>> availableInputs = new HashMap<String, HashMap<String, DataStream>>();
-
- for (final Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
- final String spoutId = spout.getKey();
- final IRichSpout userSpout = spout.getValue();
-
- final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
- userSpout.declareOutputFields(declarer);
- final HashMap<String,Fields> sourceStreams = declarer.outputStreams;
- this.outputStreams.put(spoutId, sourceStreams);
- declarers.put(spoutId, declarer);
-
- AbstractStormSpoutWrapper spoutWrapper;
-
- if (userSpout instanceof FiniteStormSpout) {
- spoutWrapper = new FiniteStormSpoutWrapper((FiniteStormSpout) userSpout);
- } else {
- spoutWrapper = new StormSpoutWrapper(userSpout);
- }
- spoutWrapper.setStormTopology(stormTopology);
-
- DataStreamSource source;
- HashMap<String, DataStream> outputStreams = new HashMap<String, DataStream>();
- if (sourceStreams.size() == 1) {
- final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];
- source = env.addSource(spoutWrapper, spoutId,
- declarer.getOutputType(outputStreamId));
- outputStreams.put(outputStreamId, source);
- } else {
- source = env.addSource(spoutWrapper, spoutId,
- TypeExtractor.getForClass(SplitStreamType.class));
- SplitStream splitSource = source.split(new FlinkStormStreamSelector());
-
- for (String streamId : sourceStreams.keySet()) {
- outputStreams.put(streamId, splitSource.select(streamId));
- }
- }
- availableInputs.put(spoutId, outputStreams);
-
- int dop = 1;
- final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common();
- if (common.is_set_parallelism_hint()) {
- dop = common.get_parallelism_hint();
- source.setParallelism(dop);
- } else {
- common.set_parallelism_hint(1);
- }
- env.increaseNumberOfTasks(dop);
- }
-
- final HashMap<String, IRichBolt> unprocessedBolts = new HashMap<String, IRichBolt>();
- unprocessedBolts.putAll(this.bolts);
-
- final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt =
- new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>();
-
- /* Because we do not know the order in which an iterator steps over a set, we might process a consumer before
- * its producer
- * ->thus, we might need to repeat multiple times
- */
- boolean makeProgress = true;
- while (unprocessedBolts.size() > 0) {
- if (!makeProgress) {
- throw new RuntimeException(
- "Unable to build Topology. Could not connect the following bolts: "
- + unprocessedBolts.keySet());
- }
- makeProgress = false;
-
- final Iterator<Entry<String, IRichBolt>> boltsIterator = unprocessedBolts.entrySet().iterator();
- while (boltsIterator.hasNext()) {
-
- final Entry<String, IRichBolt> bolt = boltsIterator.next();
- final String boltId = bolt.getKey();
- final IRichBolt userBolt = bolt.getValue();
-
- final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common();
-
- Set<Entry<GlobalStreamId, Grouping>> unprocessedInputs = unprocessdInputsPerBolt.get(boltId);
- if (unprocessedInputs == null) {
- unprocessedInputs = new HashSet<Entry<GlobalStreamId, Grouping>>();
- unprocessedInputs.addAll(common.get_inputs().entrySet());
- unprocessdInputsPerBolt.put(boltId, unprocessedInputs);
- }
-
- // connect each available producer to the current bolt
- final Iterator<Entry<GlobalStreamId, Grouping>> inputStreamsIterator = unprocessedInputs.iterator();
- while (inputStreamsIterator.hasNext()) {
-
- final Entry<GlobalStreamId, Grouping> stormInputStream = inputStreamsIterator.next();
- final String producerId = stormInputStream.getKey().get_componentId();
- final String inputStreamId = stormInputStream.getKey().get_streamId();
-
- HashMap<String, DataStream> producer = availableInputs.get(producerId);
- if (producer != null) {
- makeProgress = true;
-
- DataStream inputStream = producer.get(inputStreamId);
- if (inputStream != null) {
- final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
- userBolt.declareOutputFields(declarer);
- final HashMap<String, Fields> boltOutputStreams = declarer.outputStreams;
- this.outputStreams.put(boltId, boltOutputStreams);
- this.declarers.put(boltId, declarer);
-
- // if producer was processed already
- final Grouping grouping = stormInputStream.getValue();
- if (grouping.is_set_shuffle()) {
- // Storm uses a round-robin shuffle strategy
- inputStream = inputStream.rebalance();
- } else if (grouping.is_set_fields()) {
- // global grouping is emulated in Storm via an empty fields grouping list
- final List<String> fields = grouping.get_fields();
- if (fields.size() > 0) {
- FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
- if (producer.size() == 1) {
- inputStream = inputStream.keyBy(prodDeclarer
- .getGroupingFieldIndexes(inputStreamId,
- grouping.get_fields()));
- } else {
- inputStream = inputStream
- .keyBy(new SplitStreamTypeKeySelector(
- prodDeclarer.getGroupingFieldIndexes(
- inputStreamId,
- grouping.get_fields())));
- }
- } else {
- inputStream = inputStream.global();
- }
- } else if (grouping.is_set_all()) {
- inputStream = inputStream.broadcast();
- } else if (!grouping.is_set_local_or_shuffle()) {
- throw new UnsupportedOperationException(
- "Flink only supports (local-or-)shuffle, fields, all, and global grouping");
- }
-
- SingleOutputStreamOperator outputStream;
- StormBoltWrapper boltWrapper;
- if (boltOutputStreams.size() < 2) { // single output stream or sink
- String outputStreamId = null;
- if (boltOutputStreams.size() == 1) {
- outputStreamId = (String) boltOutputStreams.keySet().toArray()[0];
- }
- final TypeInformation<?> outType = declarer
- .getOutputType(outputStreamId);
-
- boltWrapper = new StormBoltWrapper(userBolt, this.outputStreams
- .get(producerId).get(inputStreamId));
- outputStream = inputStream.transform(boltId, outType, boltWrapper);
-
- if (outType != null) {
- // only for non-sink nodes
- HashMap<String, DataStream> op = new HashMap<String, DataStream>();
- op.put(outputStreamId, outputStream);
- availableInputs.put(boltId, op);
- }
- } else {
- final TypeInformation<?> outType = TypeExtractor
- .getForClass(SplitStreamType.class);
-
- boltWrapper = new StormBoltWrapper(userBolt, this.outputStreams.get(producerId).get(inputStreamId));
- outputStream = inputStream.transform(boltId, outType, boltWrapper);
-
- SplitStream splitStreams = outputStream
- .split(new FlinkStormStreamSelector());
-
- HashMap<String, DataStream> op = new HashMap<String, DataStream>();
- for (String outputStreamId : boltOutputStreams.keySet()) {
- op.put(outputStreamId, splitStreams.select(outputStreamId));
- }
- availableInputs.put(boltId, op);
- }
- boltWrapper.setStormTopology(stormTopology);
-
- int dop = 1;
- if (common.is_set_parallelism_hint()) {
- dop = common.get_parallelism_hint();
- outputStream.setParallelism(dop);
- } else {
- common.set_parallelism_hint(1);
- }
- env.increaseNumberOfTasks(dop);
-
- inputStreamsIterator.remove();
- } else {
- throw new RuntimeException("Cannot connect '" + boltId + "' to '"
- + producerId + "'. Stream '" + inputStreamId + "' not found.");
- }
- }
- }
-
- if (unprocessedInputs.size() == 0) {
- // all inputs are connected; processing bolt completed
- boltsIterator.remove();
- }
- }
- }
- return env;
- }
-
- /**
- * Define a new bolt in this topology with parallelism of just one thread.
- *
- * @param id
- * the id of this component. This id is referenced by other components that want to consume this bolt's
- * outputs.
- * @param bolt
- * the bolt
- * @return use the returned object to declare the inputs to this component
- */
- public BoltDeclarer setBolt(final String id, final IRichBolt bolt) {
- return this.setBolt(id, bolt, null);
- }
-
- /**
- * Define a new bolt in this topology with the specified amount of parallelism.
- *
- * @param id
- * the id of this component. This id is referenced by other components that want to consume this bolt's
- * outputs.
- * @param bolt
- * the bolt
- * @param parallelism_hint
- * the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
- * process somewhere around the cluster.
- * @return use the returned object to declare the inputs to this component
- */
- public BoltDeclarer setBolt(final String id, final IRichBolt bolt, final Number parallelism_hint) {
- final BoltDeclarer declarer = this.stormBuilder.setBolt(id, bolt, parallelism_hint);
- this.bolts.put(id, bolt);
- return declarer;
- }
-
- /**
- * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
- * kind
- * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
- * achieve proper reliability in the topology.
- *
- * @param id
- * the id of this component. This id is referenced by other components that want to consume this bolt's
- * outputs.
- * @param bolt
- * the basic bolt
- * @return use the returned object to declare the inputs to this component
- */
- public BoltDeclarer setBolt(final String id, final IBasicBolt bolt) {
- return this.setBolt(id, bolt, null);
- }
-
- /**
- * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
- * kind
- * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
- * achieve proper reliability in the topology.
- *
- * @param id
- * the id of this component. This id is referenced by other components that want to consume this bolt's
- * outputs.
- * @param bolt
- * the basic bolt
- * @param parallelism_hint
- * the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
- * process somwehere around the cluster.
- * @return use the returned object to declare the inputs to this component
- */
- public BoltDeclarer setBolt(final String id, final IBasicBolt bolt, final Number parallelism_hint) {
- return this.setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
- }
-
- /**
- * Define a new spout in this topology.
- *
- * @param id
- * the id of this component. This id is referenced by other components that want to consume this spout's
- * outputs.
- * @param spout
- * the spout
- */
- public SpoutDeclarer setSpout(final String id, final IRichSpout spout) {
- return this.setSpout(id, spout, null);
- }
-
- /**
- * Define a new spout in this topology with the specified parallelism. If the spout declares itself as
- * non-distributed, the parallelism_hint will be ignored and only one task will be allocated to this component.
- *
- * @param id
- * the id of this component. This id is referenced by other components that want to consume this spout's
- * outputs.
- * @param parallelism_hint
- * the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a
- * process somwehere around the cluster.
- * @param spout
- * the spout
- */
- public SpoutDeclarer setSpout(final String id, final IRichSpout spout, final Number parallelism_hint) {
- final SpoutDeclarer declarer = this.stormBuilder.setSpout(id, spout, parallelism_hint);
- this.spouts.put(id, spout);
- return declarer;
- }
-
- // TODO add StateSpout support (Storm 0.9.4 does not yet support StateSpouts itself)
- /* not implemented by Storm 0.9.4
- * public void setStateSpout(final String id, final IRichStateSpout stateSpout) {
- * this.stormBuilder.setStateSpout(id, stateSpout);
- * }
- * public void setStateSpout(final String id, final IRichStateSpout stateSpout, final Number parallelism_hint) {
- * this.stormBuilder.setStateSpout(id, stateSpout, parallelism_hint);
- * }
- */
-
- // for internal testing purpose only
- StormTopology getStormTopology() {
- return this.stormTopology;
- }
-}