You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:38 UTC

[13/15] flink git commit: [Storm Compatibility] Maven module restucturing and cleanup - removed storm-parent; renamed storm-core and storm-examples - updated internal Java package structure * renamed package "stormcompatibility" to "storm" *

[Storm Compatibility] Maven module restucturing and cleanup
  - removed storm-parent; renamed storm-core and storm-examples
  - updated internal Java package structure
    * renamed package "stormcompatibility" to "storm"
    * unified *SpoutWrapper* to single SpoutWrapper.java class
    * moved classes to appropriate packages
    * shortened class names by stripping "Storm"
  - some more minor fixes, cleanups, and test improvements
  - updated READMEs and web documentation
  - updated examples pom.xml to assembly WordCount jars correctly


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cb96708
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cb96708
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cb96708

Branch: refs/heads/master
Commit: 4cb9670892e34bd2321a2848f5195588e4442d0d
Parents: 7a67a60
Author: mjsax <mj...@apache.org>
Authored: Fri Oct 2 18:43:38 2015 +0200
Committer: mjsax <mj...@informatik.hu-berlin.de>
Committed: Tue Oct 6 13:29:32 2015 +0200

----------------------------------------------------------------------
 docs/apis/storm_compatibility.md                | 111 ++--
 .../flink-storm-compatibility-core/README.md    |  12 -
 .../flink-storm-compatibility-core/pom.xml      | 114 ----
 .../stormcompatibility/api/FlinkClient.java     | 315 ---------
 .../api/FlinkLocalCluster.java                  | 174 -----
 .../stormcompatibility/api/FlinkSubmitter.java  | 194 ------
 .../stormcompatibility/api/FlinkTopology.java   |  89 ---
 .../api/FlinkTopologyBuilder.java               | 407 ------------
 .../api/SplitStreamTypeKeySelector.java         |  47 --
 .../util/FiniteStormSpout.java                  |  39 --
 .../util/FlinkOutputFieldsDeclarer.java         | 168 -----
 .../util/FlinkStormStreamSelector.java          |  48 --
 .../util/FlinkTopologyContext.java              | 164 -----
 .../util/SplitStreamMapper.java                 |  39 --
 .../util/SplitStreamType.java                   |  52 --
 .../util/SplitStreamTypeKeySelector.java        |  46 --
 .../stormcompatibility/util/StormConfig.java    | 123 ----
 .../wrappers/AbstractStormCollector.java        | 143 ----
 .../wrappers/AbstractStormSpoutWrapper.java     | 153 -----
 .../wrappers/FiniteStormSpoutWrapper.java       | 111 ----
 .../wrappers/SetupOutputFieldsDeclarer.java     |  63 --
 .../wrappers/StormBoltCollector.java            |  94 ---
 .../wrappers/StormBoltWrapper.java              | 260 --------
 .../wrappers/StormFiniteSpoutWrapper.java       | 186 ------
 .../wrappers/StormSpoutCollector.java           |  82 ---
 .../wrappers/StormSpoutWrapper.java             | 105 ---
 .../stormcompatibility/wrappers/StormTuple.java | 327 ---------
 .../wrappers/StormWrapperSetupHelper.java       | 268 --------
 .../api/FlinkTopologyBuilderTest.java           |  76 ---
 .../api/FlinkTopologyTest.java                  |  62 --
 .../flink/stormcompatibility/api/TestBolt.java  |  48 --
 .../flink/stormcompatibility/api/TestSpout.java |  59 --
 .../api/TestTopologyBuilder.java                |  27 -
 .../stormcompatibility/util/AbstractTest.java   |  39 --
 .../util/FiniteTestSpout.java                   |  77 ---
 .../util/FlinkOutputFieldsDeclarerTest.java     | 193 ------
 .../util/FlinkStormStreamSelectorTest.java      |  51 --
 .../util/FlinkTopologyContextTest.java          | 114 ----
 .../stormcompatibility/util/TestDummyBolt.java  |  71 --
 .../stormcompatibility/util/TestDummySpout.java |  79 ---
 .../flink/stormcompatibility/util/TestSink.java |  60 --
 .../wrappers/FiniteStormSpoutWrapperTest.java   |  76 ---
 .../wrappers/SetupOutputFieldsDeclarerTest.java |  91 ---
 .../wrappers/StormBoltCollectorTest.java        | 105 ---
 .../wrappers/StormBoltWrapperTest.java          | 339 ----------
 .../wrappers/StormFiniteSpoutWrapperTest.java   | 117 ----
 .../wrappers/StormSpoutCollectorTest.java       |  94 ---
 .../wrappers/StormSpoutWrapperTest.java         | 120 ----
 .../wrappers/StormTupleTest.java                | 659 ------------------
 .../wrappers/StormWrapperSetupHelperTest.java   | 315 ---------
 .../wrappers/TestContext.java                   |  56 --
 .../src/test/resources/log4j-test.properties    |  27 -
 .../src/test/resources/log4j.properties         |  27 -
 .../src/test/resources/logback-test.xml         |  30 -
 .../README.md                                   |  20 -
 .../flink-storm-compatibility-examples/pom.xml  | 362 ----------
 .../excamation/ExclamationTopology.java         | 128 ----
 .../excamation/ExclamationWithStormBolt.java    | 143 ----
 .../excamation/ExclamationWithStormSpout.java   | 150 -----
 .../excamation/StormExclamationLocal.java       |  75 ---
 .../StormExclamationRemoteByClient.java         |  83 ---
 .../StormExclamationRemoteBySubmitter.java      |  81 ---
 .../stormoperators/ExclamationBolt.java         |  75 ---
 .../singlejoin/SingleJoinTopology.java          |  90 ---
 .../singlejoin/StormSingleJoinLocal.java        |  50 --
 .../singlejoin/stormoperators/AgeSpout.java     |  54 --
 .../singlejoin/stormoperators/GenderSpout.java  |  47 --
 .../stormoperators/SingleJoinBolt.java          | 132 ----
 .../split/SpoutSplitExample.java                | 102 ---
 .../split/stormoperators/RandomSpout.java       |  76 ---
 .../stormoperators/VerifyAndEnrichBolt.java     |  61 --
 .../util/AbstractStormBoltSink.java             |  76 ---
 .../util/AbstractStormSpout.java                |  70 --
 .../util/FiniteStormFileSpout.java              |  77 ---
 .../util/FiniteStormInMemorySpout.java          |  40 --
 .../util/OutputFormatter.java                   |  36 -
 .../util/SimpleOutputFormatter.java             |  42 --
 .../util/StormBoltFileSink.java                 |  76 ---
 .../util/StormBoltPrintSink.java                |  45 --
 .../stormcompatibility/util/StormFileSpout.java |  88 ---
 .../util/StormInMemorySpout.java                |  42 --
 .../util/TupleOutputFormatter.java              |  38 --
 .../wordcount/BoltTokenizerWordCount.java       | 122 ----
 .../wordcount/BoltTokenizerWordCountPojo.java   | 135 ----
 .../BoltTokenizerWordCountWithNames.java        | 138 ----
 .../wordcount/SpoutSourceWordCount.java         | 157 -----
 .../wordcount/StormWordCountLocal.java          |  75 ---
 .../wordcount/StormWordCountNamedLocal.java     |  76 ---
 .../wordcount/StormWordCountRemoteByClient.java |  85 ---
 .../StormWordCountRemoteBySubmitter.java        |  83 ---
 .../wordcount/WordCountTopology.java            | 135 ----
 .../stormoperators/StormBoltCounter.java        |  88 ---
 .../stormoperators/StormBoltCounterByName.java  |  88 ---
 .../stormoperators/StormBoltTokenizer.java      |  78 ---
 .../StormBoltTokenizerByName.java               |  78 ---
 .../stormoperators/StormWordCountFileSpout.java |  39 --
 .../StormWordCountInMemorySpout.java            |  40 --
 .../stormoperators/WordCountDataPojos.java      |  59 --
 .../stormoperators/WordCountDataTuple.java      |  34 -
 .../stormcompatibility/api/StormTestBase.java   | 117 ----
 .../ExclamationWithStormBoltITCase.java         |  49 --
 .../ExclamationWithStormSpoutITCase.java        |  46 --
 .../StormExclamationLocalITCase.java            |  48 --
 .../exclamation/util/ExclamationData.java       |  98 ---
 .../split/BoltSplitITCase.java                  |  28 -
 .../stormcompatibility/split/SplitBolt.java     |  61 --
 .../split/SplitBoltTopology.java                |  87 ---
 .../split/SplitSpoutTopology.java               |  85 ---
 .../split/SpoutSplitITCase.java                 |  28 -
 .../split/StormSplitStreamBoltLocal.java        |  51 --
 .../split/StormSplitStreamSpoutLocal.java       |  51 --
 .../wordcount/BoltTokenizerWordCountITCase.java |  45 --
 .../BoltTokenizerWordCountPojoITCase.java       |  45 --
 .../BoltTokenizerWordCountWithNamesITCase.java  |  45 --
 .../wordcount/SpoutSourceWordCountITCase.java   |  45 --
 .../wordcount/StormWordCountLocalITCase.java    |  45 --
 .../StormWordCountLocalNamedITCase.java         |  45 --
 .../src/test/resources/log4j-test.properties    |  27 -
 .../src/test/resources/log4j.properties         |  27 -
 .../src/test/resources/logback-test.xml         |  30 -
 flink-contrib/flink-storm-compatibility/pom.xml |  40 --
 flink-contrib/flink-storm-examples/README.md    |  20 +
 flink-contrib/flink-storm-examples/pom.xml      | 364 ++++++++++
 .../storm/excamation/ExclamationLocal.java      |  75 +++
 .../storm/excamation/ExclamationTopology.java   | 123 ++++
 .../storm/excamation/ExclamationWithBolt.java   | 144 ++++
 .../storm/excamation/ExclamationWithSpout.java  | 150 +++++
 .../excamation/operators/ExclamationBolt.java   |  75 +++
 .../flink/storm/split/SpoutSplitExample.java    | 102 +++
 .../storm/split/operators/RandomSpout.java      |  76 +++
 .../split/operators/VerifyAndEnrichBolt.java    |  61 ++
 .../flink/storm/util/AbstractBoltSink.java      |  76 +++
 .../flink/storm/util/AbstractLineSpout.java     |  70 ++
 .../apache/flink/storm/util/BoltFileSink.java   |  76 +++
 .../apache/flink/storm/util/BoltPrintSink.java  |  45 ++
 .../org/apache/flink/storm/util/FileSpout.java  |  88 +++
 .../flink/storm/util/FiniteFileSpout.java       |  77 +++
 .../flink/storm/util/FiniteInMemorySpout.java   |  40 ++
 .../apache/flink/storm/util/InMemorySpout.java  |  42 ++
 .../flink/storm/util/OutputFormatter.java       |  37 ++
 .../flink/storm/util/SimpleOutputFormatter.java |  41 ++
 .../flink/storm/util/TupleOutputFormatter.java  |  38 ++
 .../storm/wordcount/BoltTokenizerWordCount.java | 122 ++++
 .../wordcount/BoltTokenizerWordCountPojo.java   | 134 ++++
 .../BoltTokenizerWordCountWithNames.java        | 137 ++++
 .../storm/wordcount/SpoutSourceWordCount.java   | 157 +++++
 .../flink/storm/wordcount/WordCountLocal.java   |  76 +++
 .../storm/wordcount/WordCountLocalByName.java   |  77 +++
 .../wordcount/WordCountRemoteByClient.java      |  86 +++
 .../wordcount/WordCountRemoteBySubmitter.java   |  84 +++
 .../storm/wordcount/WordCountTopology.java      | 136 ++++
 .../storm/wordcount/operators/BoltCounter.java  |  90 +++
 .../wordcount/operators/BoltCounterByName.java  |  90 +++
 .../wordcount/operators/BoltTokenizer.java      |  78 +++
 .../operators/BoltTokenizerByName.java          |  78 +++
 .../wordcount/operators/WordCountDataPojos.java |  59 ++
 .../wordcount/operators/WordCountDataTuple.java |  34 +
 .../wordcount/operators/WordCountFileSpout.java |  39 ++
 .../operators/WordCountInMemorySpout.java       |  40 ++
 .../exclamation/ExclamationWithBoltITCase.java  |  49 ++
 .../exclamation/ExclamationWithSpoutITCase.java |  46 ++
 .../StormExclamationLocalITCase.java            |  48 ++
 .../storm/exclamation/util/ExclamationData.java |  98 +++
 .../flink/storm/split/BoltSplitITCase.java      |  28 +
 .../org/apache/flink/storm/split/SplitBolt.java |  61 ++
 .../flink/storm/split/SplitBoltTopology.java    |  87 +++
 .../flink/storm/split/SplitSpoutTopology.java   |  85 +++
 .../flink/storm/split/SplitStreamBoltLocal.java |  51 ++
 .../storm/split/SplitStreamSpoutLocal.java      |  51 ++
 .../flink/storm/split/SpoutSplitITCase.java     |  28 +
 .../apache/flink/storm/util/StormTestBase.java  | 117 ++++
 .../wordcount/BoltTokenizerWordCountITCase.java |  46 ++
 .../BoltTokenizerWordCountPojoITCase.java       |  46 ++
 .../BoltTokenizerWordCountWithNamesITCase.java  |  46 ++
 .../wordcount/SpoutSourceWordCountITCase.java   |  46 ++
 .../storm/wordcount/WordCountLocalITCase.java   |  46 ++
 .../wordcount/WordCountLocalNamedITCase.java    |  46 ++
 .../src/test/resources/log4j-test.properties    |  27 +
 .../src/test/resources/log4j.properties         |  27 +
 .../src/test/resources/logback-test.xml         |  30 +
 flink-contrib/flink-storm/README.md             |  15 +
 flink-contrib/flink-storm/pom.xml               | 114 ++++
 .../org/apache/flink/storm/api/FlinkClient.java | 315 +++++++++
 .../flink/storm/api/FlinkLocalCluster.java      | 173 +++++
 .../storm/api/FlinkOutputFieldsDeclarer.java    | 168 +++++
 .../apache/flink/storm/api/FlinkSubmitter.java  | 194 ++++++
 .../apache/flink/storm/api/FlinkTopology.java   |  89 +++
 .../flink/storm/api/FlinkTopologyBuilder.java   | 397 +++++++++++
 .../apache/flink/storm/util/FiniteSpout.java    |  36 +
 .../flink/storm/util/SplitStreamMapper.java     |  39 ++
 .../flink/storm/util/SplitStreamType.java       |  52 ++
 .../storm/util/SplitStreamTypeKeySelector.java  |  46 ++
 .../apache/flink/storm/util/StormConfig.java    | 123 ++++
 .../flink/storm/util/StormStreamSelector.java   |  48 ++
 .../storm/wrappers/AbstractStormCollector.java  | 143 ++++
 .../flink/storm/wrappers/BoltCollector.java     |  89 +++
 .../flink/storm/wrappers/BoltWrapper.java       | 260 ++++++++
 .../storm/wrappers/FlinkTopologyContext.java    | 165 +++++
 .../wrappers/SetupOutputFieldsDeclarer.java     |  66 ++
 .../flink/storm/wrappers/SpoutCollector.java    |  81 +++
 .../flink/storm/wrappers/SpoutWrapper.java      | 274 ++++++++
 .../apache/flink/storm/wrappers/StormTuple.java | 327 +++++++++
 .../storm/wrappers/WrapperSetupHelper.java      | 266 ++++++++
 .../api/FlinkOutputFieldsDeclarerTest.java      | 194 ++++++
 .../storm/api/FlinkTopologyBuilderTest.java     |  77 +++
 .../flink/storm/api/FlinkTopologyTest.java      |  63 ++
 .../org/apache/flink/storm/api/TestBolt.java    |  48 ++
 .../org/apache/flink/storm/api/TestSpout.java   |  59 ++
 .../flink/storm/api/TestTopologyBuilder.java    |  29 +
 .../apache/flink/storm/util/AbstractTest.java   |  39 ++
 .../flink/storm/util/FiniteTestSpout.java       |  77 +++
 .../storm/util/StormStreamSelectorTest.java     |  51 ++
 .../apache/flink/storm/util/TestDummyBolt.java  |  74 +++
 .../apache/flink/storm/util/TestDummySpout.java |  82 +++
 .../org/apache/flink/storm/util/TestSink.java   |  60 ++
 .../flink/storm/wrappers/BoltCollectorTest.java |  87 +++
 .../flink/storm/wrappers/BoltWrapperTest.java   | 355 ++++++++++
 .../wrappers/FlinkTopologyContextTest.java      | 115 ++++
 .../wrappers/SetupOutputFieldsDeclarerTest.java |  92 +++
 .../storm/wrappers/SpoutCollectorTest.java      |  88 +++
 .../flink/storm/wrappers/SpoutWrapperTest.java  | 220 +++++++
 .../flink/storm/wrappers/StormTupleTest.java    | 660 +++++++++++++++++++
 .../flink/storm/wrappers/TestContext.java       |  56 ++
 .../storm/wrappers/WrapperSetupHelperTest.java  | 317 +++++++++
 .../src/test/resources/log4j-test.properties    |  27 +
 .../src/test/resources/log4j.properties         |  27 +
 .../src/test/resources/logback-test.xml         |  30 +
 flink-contrib/pom.xml                           |   3 +-
 228 files changed, 10902 insertions(+), 11912 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
index d676db8..bf80d4e 100644
--- a/docs/apis/storm_compatibility.md
+++ b/docs/apis/storm_compatibility.md
@@ -36,33 +36,34 @@ This document shows how to use existing Storm code with Flink.
 
 # Project Configuration
 
-Support for Storm is contained in the `flink-storm-compatibility-core` Maven module.
-The code resides in the `org.apache.flink.stormcompatibility` package.
+Support for Storm is contained in the `flink-storm` Maven module.
+The code resides in the `org.apache.flink.storm` package.
 
 Add the following dependency to your `pom.xml` if you want to execute Storm code in Flink.
 
 ~~~xml
 <dependency>
 	<groupId>org.apache.flink</groupId>
-	<artifactId>flink-storm-compatibility-core</artifactId>
+	<artifactId>flink-storm</artifactId>
 	<version>{{site.version}}</version>
 </dependency>
 ~~~
 
-**Please note**: `flink-storm-compatibility-core` is not part of the provided binary Flink distribution.
-Thus, you need to include `flink-storm-compatiblitly-core` classes (and their dependencies) in your program jar that is submitted to Flink's JobManager.
-See *WordCount Storm* within `flink-storm-compatibility-example/pom.xml` for an example how to package a jar correctly.
+**Please note**: `flink-storm` is not part of the provided binary Flink distribution.
+Thus, you need to include `flink-storm` classes (and their dependencies) in your program jar that is submitted to Flink's JobManager.
+See *WordCount Storm* within `flink-storm-examples/pom.xml` for an example how to package a jar correctly.
 
 # Execute Storm Topologies
 
-Flink provides a Storm compatible API (`org.apache.flink.stormcompatibility.api`) that offers replacements for the following classes:
+Flink provides a Storm compatible API (`org.apache.flink.storm.api`) that offers replacements for the following classes:
 
 - `TopologyBuilder` replaced by `FlinkTopologyBuilder`
 - `StormSubmitter` replaced by `FlinkSubmitter`
 - `NimbusClient` and `Client` replaced by `FlinkClient`
 - `LocalCluster` replaced by `FlinkLocalCluster`
 
-In order to submit a Storm topology to Flink, it is sufficient to replace the used Storm classed with their Flink replacements in the original Storm client code that assembles the topology.
+In order to submit a Storm topology to Flink, it is sufficient to replace the used Storm classes with their Flink replacements in the Storm *client code that assembles* the topology.
+The actual runtime code, ie, Spouts and Bolts, can be uses *unmodified*.
 If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively.
 If a parameter is not specified, the value is taken from `flink-conf.yaml`.
 
@@ -71,10 +72,11 @@ If a parameter is not specified, the value is taken from `flink-conf.yaml`.
 ~~~java
 FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); // replaces: TopologyBuilder builder = new FlinkTopology();
 
-builder.setSpout("source", new StormFileSpout(inputFilePath));
-builder.setBolt("tokenizer", new StormBoltTokenizer()).shuffleGrouping("source");
-builder.setBolt("counter", new StormBoltCounter()).fieldsGrouping("tokenizer", new Fields("word"));
-builder.setBolt("sink", new StormBoltFileSink(outputFilePath)).shuffleGrouping("counter");
+// actual topology assembling code and used Spouts/Bolts can be used as-is
+builder.setSpout("source", new FileSpout(inputFilePath));
+builder.setBolt("tokenizer", new BoltTokenizer()).shuffleGrouping("source");
+builder.setBolt("counter", new BoltCounter()).fieldsGrouping("tokenizer", new Fields("word"));
+builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("counter");
 
 Config conf = new Config();
 if(runLocal) { // submit to test cluster
@@ -93,7 +95,7 @@ if(runLocal) { // submit to test cluster
 # Embed Storm Operators in Flink Streaming Programs 
 
 As an alternative, Spouts and Bolts can be embedded into regular streaming programs.
-The Storm compatibility layer offers a wrapper classes for each, namely `StormSpoutWrapper` and `StormBoltWrapper` (`org.apache.flink.stormcompatibility.wrappers`).
+The Storm compatibility layer offers a wrapper classes for each, namely `SpoutWrapper` and `BoltWrapper` (`org.apache.flink.storm.wrappers`).
 
 Per default, both wrappers convert Storm output tuples to Flink's [Tuple](programming_guide.html#tuples-and-case-classes) types (ie, `Tuple0` to `Tuple25` according to the number of fields of the Storm tuples).
 For single field output tuples a conversion to the field's data type is also possible (eg, `String` instead of `Tuple1<String>`).
@@ -104,7 +106,7 @@ In order to get the correct `TypeInformation` object, Flink's `TypeExtractor` ca
 ## Embed Spouts
 
 In order to use a Spout as Flink source, use `StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation)`.
-The Spout object is handed to the constructor of `StormSpoutWrapper<OUT>` that serves as first argument to `addSource(...)`.
+The Spout object is handed to the constructor of `SpoutWrapper<OUT>` that serves as first argument to `addSource(...)`.
 The generic type declaration `OUT` specifies the type of the source output stream.
 
 <div class="codetabs" markdown="1">
@@ -114,7 +116,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
 
 // stream has `raw` type (single field output streams only)
 DataStream<String> rawInput = env.addSource(
-	new StormSpoutWrapper<String>(new StormFileSpout(localFilePath), new String[] { Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
+	new SpoutWrapper<String>(new FileSpout(localFilePath), new String[] { Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
 	TypeExtractor.getForClass(String.class)); // output type
 
 // process data stream
@@ -123,15 +125,15 @@ DataStream<String> rawInput = env.addSource(
 </div>
 </div>
 
-If a Spout emits a finite number of tuples, `StormFiniteSpoutWrapper` can be used instead of `StormSpoutWrapper`.
-Using `StormFiniteSpoutWrapper` allows the Flink program to shut down automatically after all data is processed.
-If `StormSpoutWrapper` is used, the program will run until it is [canceled](cli.html) manually.
+If a Spout emits a finite number of tuples, `SpoutWrapper` can be configures to terminate automatically by setting `numberOfInvocations` parameter in its constructor.
+This allows the Flink program to shut down automatically after all data is processed.
+Per default the program will run until it is [canceled](cli.html) manually.
 
 
 ## Embed Bolts
 
 In order to use a Bolt as Flink operator, use `DataStream.transform(String, TypeInformation, OneInputStreamOperator)`.
-The Bolt object is handed to the constructor of `StormBoltWrapper<IN,OUT>` that serves as last argument to `transform(...)`.
+The Bolt object is handed to the constructor of `BoltWrapper<IN,OUT>` that serves as last argument to `transform(...)`.
 The generic type declarations `IN` and `OUT` specify the type of the operator's input and output stream, respectively.
 
 <div class="codetabs" markdown="1">
@@ -143,7 +145,7 @@ DataStream<String> text = env.readTextFile(localFilePath);
 DataStream<Tuple2<String, Integer>> counts = text.transform(
 	"tokenizer", // operator name
 	TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), // output type
-	new StormBoltWrapper<String, Tuple2<String, Integer>>(new StormBoltTokenizer())); // Bolt operator
+	new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer())); // Bolt operator
 
 // do further processing
 [...]
@@ -164,16 +166,16 @@ For this case, Flink expects either a corresponding public member variable or pu
 For example, if a Bolt accesses a field via name `sentence` (eg, `String s = input.getStringByField("sentence");`), the input POJO class must have a member variable `public String sentence;` or method `public String getSentence() { ... };` (pay attention to camel-case naming).
 
 For `Tuple` input types, it is required to specify the input schema using Storm's `Fields` class.
-For this case, the constructor of `StormBoltWrapper` takes an additional argument: `new StormBoltWrapper<Tuple1<String>, Tuple2<String, Integer>>(new StormBoltTokenizerByName(), new Fields("sentence"))`.
+For this case, the constructor of `BoltWrapper` takes an additional argument: `new BoltWrapper<Tuple1<String>, ...>(..., new Fields("sentence"))`.
 The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.getStringByField("sentence")` is equivalent to `input.getString(0)`.
 
-See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.  
+See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java) for examples.  
 
 ## Configuring Spouts and Bolts
 
 In Storm, Spouts and Bolts can be configured with a globally distributed `Map` object that is given to `submitTopology(...)` method of `LocalCluster` or `StormSubmitter`.
 This `Map` is provided by the user next to the topology and gets forwarded as a parameter to the calls `Spout.open(...)` and `Bolt.prepare(...)`.
-If a whole topology is executed using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.
+If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.
 
 For embedded usage, Flink's configuration mechanism must be used.
 A global configuration can be set in a `StreamExecutionEnvironment` via `.getConfig().setGlobalJobParameters(...)`.
@@ -202,12 +204,12 @@ env.getConfig().setGlobalJobParameters(config);
 ## Multiple Output Streams
 
 Flink can also handle the declaration of multiple output streams for Spouts and Bolts.
-If a whole topology is executed using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.
+If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.
 
-For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitDataStream.select(...)`.
-Flink provides the predefined output selector `FlinkStormStreamSelector<T>` for `.split(...)` already.
+For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitStream.select(...)`.
+Flink provides the predefined output selector `StormStreamSelector<T>` for `.split(...)` already.
 Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using `SplitStreamMapper<T>`.
-If a data stream of type `SplitStreamTuple<T>` is used as input for a Bolt, `SplitStreamTuple<T>` must **not** be removed &ndash; `StormBoltWrapper` removes it automatically.
+If a data stream of type `SplitStreamTuple<T>` is used as input for a Bolt, it is **not** required to strip the wrapper &ndash; `BoltWrapper` removes it automatically.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -217,11 +219,11 @@ If a data stream of type `SplitStreamTuple<T>` is used as input for a Bolt, `Spl
 // get DataStream from Spout or Bolt which declares two output streams s1 and s2 with output type SomeType
 DataStream<SplitStreamType<SomeType>> multiStream = ...
 
-SplitDataStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new FlinkStormStreamSelector<SomeType>());
+SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new StormStreamSelector<SomeType>());
 
 // remove SplitStreamMapper to get data stream of type SomeType
 DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>).returns(SomeType.classs);
-// apply Bolt directly, without stripping SplitStreamMapper
+// apply Bolt directly, without stripping SplitStreamType
 DataStream<BoltOutputType> s2 = splitStream.select("s2").transform(/* use Bolt for further processing */);
 
 // do further processing on s1 and s2
@@ -230,67 +232,48 @@ DataStream<BoltOutputType> s2 = splitStream.select("s2").transform(/* use Bolt f
 </div>
 </div>
 
-See [SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java) for a full example.
+See [SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java) for a full example.
 
 # Flink Extensions
 
-## Finite Storm Spouts
+## Finite Spouts
 
-In Flink streaming, sources can be finite &ndash; i.e., emit a finite number of records and stop after emitting the last record &ndash;, however, Storm spouts always emit infinite streams.
-The bridge between the two approach is the `FiniteStormSpout` interface which, in addition to `IRichSpout`, contains a `reachedEnd()` method, where the user can specify a stopping-condition.
-The user can create a finite Storm spout by implementing this interface instead of `IRichSpout`, and implementing the `reachedEnd()`method in addition.
-When used as part of a Flink topology, a `FiniteStormSpout` should be wrapped by `FiniteStormSpoutWrapper`.
+In Flink, streaming sources can be finite, ie, emit a finite number of records and stop after emitting the last record. However, Spouts usually emit infinite streams.
+The bridge between the two approaches is the `FiniteSpout` interface which, in addition to `IRichSpout`, contains a `reachedEnd()` method, where the user can specify a stopping-condition.
+The user can create a finite Spout by implementing this interface instead of (or additionally to) `IRichSpout`, and implementing the `reachedEnd()` method in addition.
+In contrast to a `SpoutWrapper` that is configured to emit a finite number of tuples, `FiniteSpout` interface allows to implement more complex termination criteria.
 
-Although finite Storm spouts are not necessary to embed Storm spouts into a Flink streaming program or to submit a whole Storm topology to Flink, there are cases where they may come in handy:
+Although finite Spouts are not necessary to embed Spouts into a Flink streaming program or to submit a whole Storm topology to Flink, there are cases where they may come in handy:
 
- * to achieve that a native Storm spout behaves the same way as a finite Flink source with minimal modifications
- * the user wants to process a stream only for some time; after that, the spout can stop automatically
+ * to achieve that a native Spout behaves the same way as a finite Flink source with minimal modifications
+ * the user wants to process a stream only for some time; after that, the Spout can stop automatically
  * reading a file into a stream
  * for testing purposes
 
-A `FiniteStormSpout` can be still used as a normal, infinite Storm spout by changing its wrapper class to `StormSpoutWraper` in the Flink topology.
-
-An example of a finite Storm spout that emits records for 10 seconds only:
+An example of a finite Spout that emits records for 10 seconds only:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 ~~~java
-public class TimedFiniteStormSpout extends AbstractStormSpout implements FiniteStormSpout {
-	[...]
+public class TimedFiniteSpout extends BaseRichSpout implements FiniteSpout {
+	[...] // implemente open(), nextTuple(), ...
+
 	private long starttime = System.currentTimeMillis();
 
 	public boolean reachedEnd() {
 		return System.currentTimeMillis() - starttime > 10000l;
 	}
-	[...]
 }
 ~~~
 </div>
 </div>
 
-Using a `FiniteStormSpout` in a Flink topology:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-~~~java
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-DataStream<String> rawInput = env.addSource(
-	new FiniteStormSpoutWrapper<String>(new TimedFiniteStormSpout(), true)
-	TypeExtractor.getForClass(String.class));
-
-// process data stream
-[...]
-~~~
-</div>
-</div>
-
 # Storm Compatibility Examples
 
-You can find more examples in Maven module `flink-storm-compatibilty-examples`.
-For the different versions of WordCount, see [README.md](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md).
+You can find more examples in Maven module `flink-storm-examples`.
+For the different versions of WordCount, see [README.md](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/README.md).
 To run the examples, you need to assemble a correct jar file.
-`flink-storm-compatibility-examples-0.10-SNAPSHOT.jar` is **no** valid jar file for job execution (it is only a standard maven artifact).
+`flink-storm-examples-0.10-SNAPSHOT.jar` is **no** valid jar file for job execution (it is only a standard maven artifact).
 
 There are example jars for embedded Spout and Bolt, namely `WordCount-SpoutSource.jar` and `WordCount-BoltTokenizer.jar`, respectively.
 Compare `pom.xml` to see how both jars are built.

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
deleted file mode 100644
index 9663fc7..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
+++ /dev/null
@@ -1,12 +0,0 @@
-# flink-storm-compatibility
-
-The Storm compatibility layer allows to embed spouts or bolt unmodified within a regular Flink streaming program (`StormSpoutWrapper` and `StormBoltWrapper`). Additionally, a whole Storm topology can be submitted to Flink (see `FlinkTopologyBuilder`, `FlinkLocalCluster`, and `FlinkSubmitter`). Only a few minor changes to the original submitting code are required. The code that builds the topology itself, can be reused unmodified. See `flink-storm-examples` for a simple word-count example.
-
-The following Strom features are not (yet/fully) supported by the compatibility layer right now:
-* tuple meta information
-* no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
-* for whole Storm topologies the following is not supported by Flink:
-  * direct emit connection pattern
-  * activating/deactivating and rebalancing of topologies
-  * task hooks
-  * custom metrics

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/pom.xml b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/pom.xml
deleted file mode 100644
index cced678..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/pom.xml
+++ /dev/null
@@ -1,114 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-storm-compatibility-parent</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-storm-compatibility-core</artifactId>
-	<name>flink-storm-compatibility-core</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.storm</groupId>
-			<artifactId>storm-core</artifactId>
-			<version>0.9.4</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>log4j-over-slf4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<artifactId>logback-classic</artifactId>
-					<groupId>ch.qos.logback</groupId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-
-		<pluginManagement>
-			<plugins>
-				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
-				<plugin>
-					<groupId>org.eclipse.m2e</groupId>
-					<artifactId>lifecycle-mapping</artifactId>
-					<version>1.0.0</version>
-					<configuration>
-						<lifecycleMappingMetadata>
-							<pluginExecutions>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>org.apache.maven.plugins</groupId>
-										<artifactId>maven-dependency-plugin</artifactId>
-										<versionRange>[2.9,)</versionRange>
-										<goals>
-											<goal>unpack</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-							</pluginExecutions>
-						</lifecycleMappingMetadata>
-					</configuration>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-
-	</build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
deleted file mode 100644
index 4676102..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import backtype.storm.Config;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-import com.google.common.collect.Lists;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-
-import scala.Some;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Map;
-
-/**
- * {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link Nimbus}{@code .Client} at once, to interact with
- * Flink's JobManager instead of Storm's Nimbus.
- */
-public class FlinkClient {
-
-	/** The client's configuration */
-	private final Map<?,?> conf;
-	/** The jobmanager's host name */
-	private final String jobManagerHost;
-	/** The jobmanager's rpc port */
-	private final int jobManagerPort;
-	/** The user specified timeout in milliseconds */
-	private final String timeout;
-
-	// The following methods are derived from "backtype.storm.utils.NimbusClient"
-
-	/**
-	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
-	 * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
-	 *
-	 * @param conf
-	 * 		A configuration.
-	 * @param host
-	 * 		The jobmanager's host name.
-	 * @param port
-	 * 		The jobmanager's rpc port.
-	 */
-	@SuppressWarnings("rawtypes")
-	public FlinkClient(final Map conf, final String host, final int port) {
-		this(conf, host, port, null);
-	}
-
-	/**
-	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
-	 * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
-	 *
-	 * @param conf
-	 * 		A configuration.
-	 * @param host
-	 * 		The jobmanager's host name.
-	 * @param port
-	 * 		The jobmanager's rpc port.
-	 * @param timeout
-	 * 		Timeout
-	 */
-	@SuppressWarnings("rawtypes")
-	public FlinkClient(final Map conf, final String host, final int port, final Integer timeout) {
-		this.conf = conf;
-		this.jobManagerHost = host;
-		this.jobManagerPort = port;
-		if (timeout != null) {
-			this.timeout = timeout + " ms";
-		} else {
-			this.timeout = null;
-		}
-	}
-
-	/**
-	 * Returns a {@link FlinkClient} that uses the configured {@link Config#NIMBUS_HOST} and {@link
-	 * Config#NIMBUS_THRIFT_PORT} as JobManager address.
-	 *
-	 * @param conf
-	 * 		Configuration that contains the jobmanager's hostname and port.
-	 * @return A configured {@link FlinkClient}.
-	 */
-	@SuppressWarnings("rawtypes")
-	public static FlinkClient getConfiguredClient(final Map conf) {
-		final String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
-		final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
-		return new FlinkClient(conf, nimbusHost, nimbusPort);
-	}
-
-	/**
-	 * Return a reference to itself.
-	 * <p/>
-	 * {@link FlinkClient} mimics both, {@link NimbusClient} and {@link Nimbus}{@code .Client}, at once.
-	 *
-	 * @return A reference to itself.
-	 */
-	public FlinkClient getClient() {
-		return this;
-	}
-
-	// The following methods are derived from "backtype.storm.generated.Nimubs.Client"
-
-	/**
-	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
-	 * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
-	 */
-	public void submitTopology(final String name, final String uploadedJarLocation, final FlinkTopology topology)
-			throws AlreadyAliveException, InvalidTopologyException {
-		this.submitTopologyWithOpts(name, uploadedJarLocation, topology);
-	}
-
-	/**
-	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
-	 * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
-	 */
-	public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology
-			topology)
-					throws AlreadyAliveException, InvalidTopologyException {
-
-		if (this.getTopologyJobId(name) != null) {
-			throw new AlreadyAliveException();
-		}
-
-		final File uploadedJarFile = new File(uploadedJarLocation);
-		try {
-			JobWithJars.checkJarFile(uploadedJarFile);
-		} catch (final IOException e) {
-			throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
-		}
-
-		/* set storm configuration */
-		if (this.conf != null) {
-			topology.getConfig().setGlobalJobParameters(new StormConfig(this.conf));
-		}
-
-		final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
-		jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
-
-		final Configuration configuration = jobGraph.getJobConfiguration();
-		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
-		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
-
-		final Client client;
-		try {
-			client = new Client(configuration);
-		} catch (IOException e) {
-			throw new RuntimeException("Could not establish a connection to the job manager", e);
-		}
-
-		try {
-			ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader(
-					Lists.newArrayList(uploadedJarFile),
-					this.getClass().getClassLoader());
-			client.runDetached(jobGraph, classLoader);
-		} catch (final ProgramInvocationException e) {
-			throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
-		}
-	}
-
-	public void killTopology(final String name) throws NotAliveException {
-		this.killTopologyWithOpts(name, null);
-	}
-
-	public void killTopologyWithOpts(final String name, final KillOptions options) throws NotAliveException {
-		final JobID jobId = this.getTopologyJobId(name);
-		if (jobId == null) {
-			throw new NotAliveException();
-		}
-
-		try {
-			final ActorRef jobManager = this.getJobManager();
-
-			if (options != null) {
-				try {
-					Thread.sleep(1000 * options.get_wait_secs());
-				} catch (final InterruptedException e) {
-					throw new RuntimeException(e);
-				}
-			}
-
-			final FiniteDuration askTimeout = this.getTimeout();
-			final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
-			try {
-				Await.result(response, askTimeout);
-			} catch (final Exception e) {
-				throw new RuntimeException("Killing topology " + name + " with Flink job ID " + jobId + " failed", e);
-			}
-		} catch (final IOException e) {
-			throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
-					+ ":" + this.jobManagerPort, e);
-		}
-	}
-
-	// Flink specific additional methods
-
-	/**
-	 * Package internal method to get a Flink {@link JobID} from a Storm topology name.
-	 *
-	 * @param id
-	 * 		The Storm topology name.
-	 * @return Flink's internally used {@link JobID}.
-	 */
-	JobID getTopologyJobId(final String id) {
-		final Configuration configuration = GlobalConfiguration.getConfiguration();
-		if (this.timeout != null) {
-			configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
-		}
-
-		try {
-			final ActorRef jobManager = this.getJobManager();
-
-			final FiniteDuration askTimeout = this.getTimeout();
-			final Future<Object> response = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus(),
-					new Timeout(askTimeout));
-
-			Object result;
-			try {
-				result = Await.result(response, askTimeout);
-			} catch (final Exception e) {
-				throw new RuntimeException("Could not retrieve running jobs from the JobManager", e);
-			}
-
-			if (result instanceof RunningJobsStatus) {
-				final List<JobStatusMessage> jobs = ((RunningJobsStatus) result).getStatusMessages();
-
-				for (final JobStatusMessage status : jobs) {
-					if (status.getJobName().equals(id)) {
-						return status.getJobId();
-					}
-				}
-			} else {
-				throw new RuntimeException("ReqeustRunningJobs requires a response of type "
-						+ "RunningJobs. Instead the response is of type " + result.getClass() + ".");
-			}
-		} catch (final IOException e) {
-			throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
-					+ ":" + this.jobManagerPort, e);
-		}
-
-		return null;
-	}
-
-	private FiniteDuration getTimeout() {
-		final Configuration configuration = GlobalConfiguration.getConfiguration();
-		if (this.timeout != null) {
-			configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
-		}
-
-		return AkkaUtils.getTimeout(configuration);
-	}
-
-	private ActorRef getJobManager() throws IOException {
-		final Configuration configuration = GlobalConfiguration.getConfiguration();
-
-		ActorSystem actorSystem;
-		try {
-			final scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
-			actorSystem = AkkaUtils.createActorSystem(configuration, new Some<scala.Tuple2<String, Object>>(
-					systemEndpoint));
-		} catch (final Exception e) {
-			throw new RuntimeException("Could not start actor system to communicate with JobManager", e);
-		}
-
-		return JobManager.getJobManagerActorRef(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
-				actorSystem, AkkaUtils.getLookupTimeout(configuration));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
deleted file mode 100644
index 9b3fb54..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.RebalanceOptions;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologyInfo;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
- */
-public class FlinkLocalCluster {
-
-	/** The log used by this mini cluster */
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);
-
-	/** The flink mini cluster on which to execute the programs */
-	private final FlinkMiniCluster flink;
-
-
-	public FlinkLocalCluster() {
-		this.flink = new LocalFlinkMiniCluster(new Configuration(), true, StreamingMode.STREAMING);
-		this.flink.start();
-	}
-
-	public FlinkLocalCluster(FlinkMiniCluster flink) {
-		this.flink = Objects.requireNonNull(flink);
-	}
-
-	@SuppressWarnings("rawtypes")
-	public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology)
-			throws Exception {
-		this.submitTopologyWithOpts(topologyName, conf, topology, null);
-	}
-
-	@SuppressWarnings("rawtypes")
-	public void submitTopologyWithOpts(final String topologyName, final Map conf, final FlinkTopology topology, final SubmitOptions submitOpts) throws Exception {
-		LOG.info("Running Storm topology on FlinkLocalCluster");
-
-		if(conf != null) {
-			topology.getConfig().setGlobalJobParameters(new StormConfig(conf));
-		}
-
-		JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
-		this.flink.submitJobDetached(jobGraph);
-	}
-
-	public void killTopology(final String topologyName) {
-		this.killTopologyWithOpts(topologyName, null);
-	}
-
-	public void killTopologyWithOpts(final String name, final KillOptions options) {
-	}
-
-	public void activate(final String topologyName) {
-	}
-
-	public void deactivate(final String topologyName) {
-	}
-
-	public void rebalance(final String name, final RebalanceOptions options) {
-	}
-
-	public void shutdown() {
-		flink.stop();
-	}
-
-	public String getTopologyConf(final String id) {
-		return null;
-	}
-
-	public StormTopology getTopology(final String id) {
-		return null;
-	}
-
-	public ClusterSummary getClusterInfo() {
-		return null;
-	}
-
-	public TopologyInfo getTopologyInfo(final String id) {
-		return null;
-	}
-
-	public Map<?, ?> getState() {
-		return null;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Access to default local cluster
-	// ------------------------------------------------------------------------
-
-	// A different {@link FlinkLocalCluster} to be used for execution of ITCases
-	private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();
-
-	/**
-	 * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
-	 * {@link #initialize(LocalClusterFactory)} in advance, a new {@link FlinkLocalCluster} is returned.
-	 *
-	 * @return a {@link FlinkLocalCluster} to be used for execution
-	 */
-	public static FlinkLocalCluster getLocalCluster() {
-		return currentFactory.createLocalCluster();
-	}
-
-	/**
-	 * Sets a different factory for FlinkLocalClusters to be used for execution.
-	 *
-	 * @param clusterFactory
-	 * 		The LocalClusterFactory to create the local clusters for execution.
-	 */
-	public static void initialize(LocalClusterFactory clusterFactory) {
-		currentFactory = Objects.requireNonNull(clusterFactory);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Cluster factory
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A factory that creates local clusters.
-	 */
-	public static interface LocalClusterFactory {
-
-		/**
-		 * Creates a local flink cluster.
-		 * @return A local flink cluster.
-		 */
-		FlinkLocalCluster createLocalCluster();
-	}
-
-	/**
-	 * A factory that instantiates a FlinkLocalCluster.
-	 */
-	public static class DefaultLocalClusterFactory implements LocalClusterFactory {
-
-		@Override
-		public FlinkLocalCluster createLocalCluster() {
-			return new FlinkLocalCluster();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
deleted file mode 100644
index 5f3f31e..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Map;
-
-/**
- * {@link FlinkSubmitter} mimics a {@link StormSubmitter} to submit Storm topologies to a Flink cluster.
- */
-public class FlinkSubmitter {
-	public final static Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class);
-
-	/**
-	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
-	 *
-	 * @param name
-	 * 		the name of the storm.
-	 * @param stormConf
-	 * 		the topology-specific configuration. See {@link Config}.
-	 * @param topology
-	 * 		the processing to execute.
-	 * @param opts
-	 * 		to manipulate the starting of the topology.
-	 * @throws AlreadyAliveException
-	 * 		if a topology with this name is already running
-	 * @throws InvalidTopologyException
-	 * 		if an invalid topology was submitted
-	 */
-	public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology,
-			final SubmitOptions opts)
-					throws AlreadyAliveException, InvalidTopologyException {
-		submitTopology(name, stormConf, topology);
-	}
-
-	/**
-	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. The given {@link
-	 * FlinkProgressListener} is ignored because progress bars are not supported by Flink.
-	 *
-	 * @param name
-	 * 		the name of the storm.
-	 * @param stormConf
-	 * 		the topology-specific configuration. See {@link Config}.
-	 * @param topology
-	 * 		the processing to execute.
-	 * @throws AlreadyAliveException
-	 * 		if a topology with this name is already running
-	 * @throws InvalidTopologyException
-	 * 		if an invalid topology was submitted
-	 */
-	@SuppressWarnings({"rawtypes", "unchecked"})
-	public static void submitTopology(final String name, final Map stormConf, final FlinkTopology topology)
-			throws AlreadyAliveException, InvalidTopologyException {
-		if (!Utils.isValidConf(stormConf)) {
-			throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
-		}
-
-		final Configuration flinkConfig = GlobalConfiguration.getConfiguration();
-		if (!stormConf.containsKey(Config.NIMBUS_HOST)) {
-			stormConf.put(Config.NIMBUS_HOST,
-					flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));
-		}
-		if (!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
-			stormConf.put(Config.NIMBUS_THRIFT_PORT,
-					new Integer(flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-							6123)));
-		}
-
-		final String serConf = JSONValue.toJSONString(stormConf);
-
-		final FlinkClient client = FlinkClient.getConfiguredClient(stormConf);
-		try {
-			if (client.getTopologyJobId(name) != null) {
-				throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
-			}
-			String localJar = System.getProperty("storm.jar");
-			if (localJar == null) {
-				try {
-					for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
-							.getJars()) {
-						// TODO verify that there is only one jar
-						localJar = file.getAbsolutePath();
-					}
-				} catch (final ClassCastException e) {
-					// ignore
-				}
-			}
-
-			logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
-			client.submitTopologyWithOpts(name, localJar, topology);
-		} catch (final InvalidTopologyException e) {
-			logger.warn("Topology submission exception: " + e.get_msg());
-			throw e;
-		} catch (final AlreadyAliveException e) {
-			logger.warn("Topology already alive exception", e);
-			throw e;
-		}
-
-		logger.info("Finished submitting topology: " + name);
-	}
-
-	/**
-	 * Same as {@link #submitTopology(String, Map, FlinkTopology, SubmitOptions)}. Progress bars are not supported by
-	 * Flink.
-	 *
-	 * @param name
-	 * 		the name of the storm.
-	 * @param stormConf
-	 * 		the topology-specific configuration. See {@link Config}.
-	 * @param topology
-	 * 		the processing to execute.
-	 * @throws AlreadyAliveException
-	 * 		if a topology with this name is already running
-	 * @throws InvalidTopologyException
-	 * 		if an invalid topology was submitted
-	 */
-	public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf,
-			final FlinkTopology topology)
-					throws AlreadyAliveException, InvalidTopologyException {
-		submitTopology(name, stormConf, topology);
-	}
-
-	/**
-	 * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
-	 * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
-	 * environment.
-	 *
-	 * @param conf
-	 * 		the topology-specific configuration. See {@link Config}.
-	 * @param localJar
-	 * 		file path of the jar file to submit
-	 * @return the value of parameter localJar
-	 */
-	@SuppressWarnings("rawtypes")
-	public static String submitJar(final Map conf, final String localJar) {
-		return submitJar(localJar);
-	}
-
-	/**
-	 * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
-	 * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
-	 * environment.
-	 *
-	 * @param localJar
-	 * 		file path of the jar file to submit
-	 * @return the value of parameter localJar
-	 */
-	public static String submitJar(final String localJar) {
-		if (localJar == null) {
-			throw new RuntimeException(
-					"Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar " +
-					"to upload");
-		}
-
-		return localJar;
-	}
-
-	/**
-	 * Dummy interface use to track progress of file upload. Does not do anything. Kept for compatibility.
-	 */
-	public interface FlinkProgressListener {
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
deleted file mode 100644
index 179466e..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.generated.StormTopology;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link
- * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology}
- * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or
- * {@link FlinkClient}.
- */
-public class FlinkTopology extends StreamExecutionEnvironment {
-
-	/** The number of declared tasks for the whole program (ie, sum over all dops) */
-	private int numberOfTasks = 0;
-
-	public FlinkTopology() {
-		// Set default parallelism to 1, to mirror Storm default behavior
-		super.setParallelism(1);
-	}
-
-	/**
-	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link
-	 * FlinkClient}.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		at every invocation
-	 */
-	@Override
-	public JobExecutionResult execute() throws Exception {
-		throw new UnsupportedOperationException(
-				"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
-				"instead.");
-	}
-
-	/**
-	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter} or {@link
-	 * FlinkClient}.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		at every invocation
-	 */
-	@Override
-	public JobExecutionResult execute(final String jobName) throws Exception {
-		throw new UnsupportedOperationException(
-				"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
-				"instead.");
-	}
-
-	/**
-	 * Increased the number of declared tasks of this program by the given value.
-	 *
-	 * @param dop
-	 * 		The dop of a new operator that increases the number of overall tasks.
-	 */
-	public void increaseNumberOfTasks(final int dop) {
-		assert (dop > 0);
-		this.numberOfTasks += dop;
-	}
-
-	/**
-	 * Return the number or required tasks to execute this program.
-	 *
-	 * @return the number or required tasks to execute this program
-	 */
-	public int getNumberOfTasks() {
-		return this.numberOfTasks;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
deleted file mode 100644
index d62d56b..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BasicBoltExecutor;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.IRichStateSpout;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
-import org.apache.flink.stormcompatibility.util.FlinkOutputFieldsDeclarer;
-import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.apache.flink.stormcompatibility.util.SplitStreamTypeKeySelector;
-import org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper;
-import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
-import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
-import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-
-/**
- * {@link FlinkTopologyBuilder} mimics a {@link TopologyBuilder}, but builds a Flink program instead of a Storm
- * topology. Most methods (except {@link #createTopology()} are copied from the original {@link TopologyBuilder}
- * implementation to ensure equal behavior.<br />
- * <br />
- * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported.</strong>
- */
-public class FlinkTopologyBuilder {
-
-	/** A Storm {@link TopologyBuilder} to build a real Storm topology */
-	private final TopologyBuilder stormBuilder = new TopologyBuilder();
-	/** All user spouts by their ID */
-	private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>();
-	/** All user bolts by their ID */
-	private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
-	/** All declared streams and output schemas by operator ID */
-	private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<String, HashMap<String, Fields>>();
-	/** All spouts&bolts declarers by their ID */
-	private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>();
-	// needs to be a class member for internal testing purpose
-	private StormTopology stormTopology;
-
-
-	/**
-	 * Creates a Flink program that uses the specified spouts and bolts.
-	 */
-	@SuppressWarnings({"rawtypes", "unchecked"})
-	public FlinkTopology createTopology() {
-		this.stormTopology = this.stormBuilder.createTopology();
-
-		final FlinkTopology env = new FlinkTopology();
-		env.setParallelism(1);
-
-		final HashMap<String, HashMap<String, DataStream>> availableInputs = new HashMap<String, HashMap<String, DataStream>>();
-
-		for (final Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
-			final String spoutId = spout.getKey();
-			final IRichSpout userSpout = spout.getValue();
-
-			final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
-			userSpout.declareOutputFields(declarer);
-			final HashMap<String,Fields> sourceStreams = declarer.outputStreams;
-			this.outputStreams.put(spoutId, sourceStreams);
-			declarers.put(spoutId, declarer);
-
-			AbstractStormSpoutWrapper spoutWrapper;
-
-			if (userSpout instanceof FiniteStormSpout) {
-				spoutWrapper = new FiniteStormSpoutWrapper((FiniteStormSpout) userSpout);
-			} else {
-				spoutWrapper = new StormSpoutWrapper(userSpout);
-			}
-			spoutWrapper.setStormTopology(stormTopology);
-
-			DataStreamSource source;
-			HashMap<String, DataStream> outputStreams = new HashMap<String, DataStream>();
-			if (sourceStreams.size() == 1) {
-				final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];
-				source = env.addSource(spoutWrapper, spoutId,
-						declarer.getOutputType(outputStreamId));
-				outputStreams.put(outputStreamId, source);
-			} else {
-				source = env.addSource(spoutWrapper, spoutId,
-						TypeExtractor.getForClass(SplitStreamType.class));
-				SplitStream splitSource = source.split(new FlinkStormStreamSelector());
-
-				for (String streamId : sourceStreams.keySet()) {
-					outputStreams.put(streamId, splitSource.select(streamId));
-				}
-			}
-			availableInputs.put(spoutId, outputStreams);
-
-			int dop = 1;
-			final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common();
-			if (common.is_set_parallelism_hint()) {
-				dop = common.get_parallelism_hint();
-				source.setParallelism(dop);
-			} else {
-				common.set_parallelism_hint(1);
-			}
-			env.increaseNumberOfTasks(dop);
-		}
-
-		final HashMap<String, IRichBolt> unprocessedBolts = new HashMap<String, IRichBolt>();
-		unprocessedBolts.putAll(this.bolts);
-
-		final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt =
-				new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>();
-
-		/* Because we do not know the order in which an iterator steps over a set, we might process a consumer before
-		 * its producer
-		 * ->thus, we might need to repeat multiple times
-		 */
-		boolean makeProgress = true;
-		while (unprocessedBolts.size() > 0) {
-			if (!makeProgress) {
-				throw new RuntimeException(
-						"Unable to build Topology. Could not connect the following bolts: "
-								+ unprocessedBolts.keySet());
-			}
-			makeProgress = false;
-
-			final Iterator<Entry<String, IRichBolt>> boltsIterator = unprocessedBolts.entrySet().iterator();
-			while (boltsIterator.hasNext()) {
-
-				final Entry<String, IRichBolt> bolt = boltsIterator.next();
-				final String boltId = bolt.getKey();
-				final IRichBolt userBolt = bolt.getValue();
-
-				final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common();
-
-				Set<Entry<GlobalStreamId, Grouping>> unprocessedInputs = unprocessdInputsPerBolt.get(boltId);
-				if (unprocessedInputs == null) {
-					unprocessedInputs = new HashSet<Entry<GlobalStreamId, Grouping>>();
-					unprocessedInputs.addAll(common.get_inputs().entrySet());
-					unprocessdInputsPerBolt.put(boltId, unprocessedInputs);
-				}
-
-				// connect each available producer to the current bolt
-				final Iterator<Entry<GlobalStreamId, Grouping>> inputStreamsIterator = unprocessedInputs.iterator();
-				while (inputStreamsIterator.hasNext()) {
-
-					final Entry<GlobalStreamId, Grouping> stormInputStream = inputStreamsIterator.next();
-					final String producerId = stormInputStream.getKey().get_componentId();
-					final String inputStreamId = stormInputStream.getKey().get_streamId();
-
-					HashMap<String, DataStream> producer = availableInputs.get(producerId);
-					if (producer != null) {
-						makeProgress = true;
-
-						DataStream inputStream = producer.get(inputStreamId);
-						if (inputStream != null) {
-							final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
-							userBolt.declareOutputFields(declarer);
-							final HashMap<String, Fields> boltOutputStreams = declarer.outputStreams;
-							this.outputStreams.put(boltId, boltOutputStreams);
-							this.declarers.put(boltId, declarer);
-
-							// if producer was processed already
-							final Grouping grouping = stormInputStream.getValue();
-							if (grouping.is_set_shuffle()) {
-								// Storm uses a round-robin shuffle strategy
-								inputStream = inputStream.rebalance();
-							} else if (grouping.is_set_fields()) {
-								// global grouping is emulated in Storm via an empty fields grouping list
-								final List<String> fields = grouping.get_fields();
-								if (fields.size() > 0) {
-									FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
-									if (producer.size() == 1) {
-										inputStream = inputStream.keyBy(prodDeclarer
-												.getGroupingFieldIndexes(inputStreamId,
-														grouping.get_fields()));
-									} else {
-										inputStream = inputStream
-												.keyBy(new SplitStreamTypeKeySelector(
-														prodDeclarer.getGroupingFieldIndexes(
-																inputStreamId,
-																grouping.get_fields())));
-									}
-								} else {
-									inputStream = inputStream.global();
-								}
-							} else if (grouping.is_set_all()) {
-								inputStream = inputStream.broadcast();
-							} else if (!grouping.is_set_local_or_shuffle()) {
-								throw new UnsupportedOperationException(
-										"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
-							}
-
-							SingleOutputStreamOperator outputStream;
-							StormBoltWrapper boltWrapper;
-							if (boltOutputStreams.size() < 2) { // single output stream or sink
-								String outputStreamId = null;
-								if (boltOutputStreams.size() == 1) {
-									outputStreamId = (String) boltOutputStreams.keySet().toArray()[0];
-								}
-								final TypeInformation<?> outType = declarer
-										.getOutputType(outputStreamId);
-
-								boltWrapper = new StormBoltWrapper(userBolt, this.outputStreams
-										.get(producerId).get(inputStreamId));
-								outputStream = inputStream.transform(boltId, outType, boltWrapper);
-
-								if (outType != null) {
-									// only for non-sink nodes
-									HashMap<String, DataStream> op = new HashMap<String, DataStream>();
-									op.put(outputStreamId, outputStream);
-									availableInputs.put(boltId, op);
-								}
-							} else {
-								final TypeInformation<?> outType = TypeExtractor
-										.getForClass(SplitStreamType.class);
-
-								boltWrapper = new StormBoltWrapper(userBolt, this.outputStreams.get(producerId).get(inputStreamId));
-								outputStream = inputStream.transform(boltId, outType, boltWrapper);
-
-								SplitStream splitStreams = outputStream
-										.split(new FlinkStormStreamSelector());
-
-								HashMap<String, DataStream> op = new HashMap<String, DataStream>();
-								for (String outputStreamId : boltOutputStreams.keySet()) {
-									op.put(outputStreamId, splitStreams.select(outputStreamId));
-								}
-								availableInputs.put(boltId, op);
-							}
-							boltWrapper.setStormTopology(stormTopology);
-
-							int dop = 1;
-							if (common.is_set_parallelism_hint()) {
-								dop = common.get_parallelism_hint();
-								outputStream.setParallelism(dop);
-							} else {
-								common.set_parallelism_hint(1);
-							}
-							env.increaseNumberOfTasks(dop);
-
-							inputStreamsIterator.remove();
-						} else {
-							throw new RuntimeException("Cannot connect '" + boltId + "' to '"
-									+ producerId + "'. Stream '" + inputStreamId + "' not found.");
-						}
-					}
-				}
-
-				if (unprocessedInputs.size() == 0) {
-					// all inputs are connected; processing bolt completed
-					boltsIterator.remove();
-				}
-			}
-		}
-		return env;
-	}
-
-	/**
-	 * Define a new bolt in this topology with parallelism of just one thread.
-	 *
-	 * @param id
-	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
-	 * 		outputs.
-	 * @param bolt
-	 * 		the bolt
-	 * @return use the returned object to declare the inputs to this component
-	 */
-	public BoltDeclarer setBolt(final String id, final IRichBolt bolt) {
-		return this.setBolt(id, bolt, null);
-	}
-
-	/**
-	 * Define a new bolt in this topology with the specified amount of parallelism.
-	 *
-	 * @param id
-	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
-	 * 		outputs.
-	 * @param bolt
-	 * 		the bolt
-	 * @param parallelism_hint
-	 * 		the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
-	 * 		process somewhere around the cluster.
-	 * @return use the returned object to declare the inputs to this component
-	 */
-	public BoltDeclarer setBolt(final String id, final IRichBolt bolt, final Number parallelism_hint) {
-		final BoltDeclarer declarer = this.stormBuilder.setBolt(id, bolt, parallelism_hint);
-		this.bolts.put(id, bolt);
-		return declarer;
-	}
-
-	/**
-	 * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
-	 * kind
-	 * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
-	 * achieve proper reliability in the topology.
-	 *
-	 * @param id
-	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
-	 * 		outputs.
-	 * @param bolt
-	 * 		the basic bolt
-	 * @return use the returned object to declare the inputs to this component
-	 */
-	public BoltDeclarer setBolt(final String id, final IBasicBolt bolt) {
-		return this.setBolt(id, bolt, null);
-	}
-
-	/**
-	 * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
-	 * kind
-	 * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
-	 * achieve proper reliability in the topology.
-	 *
-	 * @param id
-	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
-	 * 		outputs.
-	 * @param bolt
-	 * 		the basic bolt
-	 * @param parallelism_hint
-	 * 		the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
-	 * 		process somwehere around the cluster.
-	 * @return use the returned object to declare the inputs to this component
-	 */
-	public BoltDeclarer setBolt(final String id, final IBasicBolt bolt, final Number parallelism_hint) {
-		return this.setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
-	}
-
-	/**
-	 * Define a new spout in this topology.
-	 *
-	 * @param id
-	 * 		the id of this component. This id is referenced by other components that want to consume this spout's
-	 * 		outputs.
-	 * @param spout
-	 * 		the spout
-	 */
-	public SpoutDeclarer setSpout(final String id, final IRichSpout spout) {
-		return this.setSpout(id, spout, null);
-	}
-
-	/**
-	 * Define a new spout in this topology with the specified parallelism. If the spout declares itself as
-	 * non-distributed, the parallelism_hint will be ignored and only one task will be allocated to this component.
-	 *
-	 * @param id
-	 * 		the id of this component. This id is referenced by other components that want to consume this spout's
-	 * 		outputs.
-	 * @param parallelism_hint
-	 * 		the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a
-	 * 		process somwehere around the cluster.
-	 * @param spout
-	 * 		the spout
-	 */
-	public SpoutDeclarer setSpout(final String id, final IRichSpout spout, final Number parallelism_hint) {
-		final SpoutDeclarer declarer = this.stormBuilder.setSpout(id, spout, parallelism_hint);
-		this.spouts.put(id, spout);
-		return declarer;
-	}
-
-	// TODO add StateSpout support (Storm 0.9.4 does not yet support StateSpouts itself)
-	/* not implemented by Storm 0.9.4
-	 * public void setStateSpout(final String id, final IRichStateSpout stateSpout) {
-	 * this.stormBuilder.setStateSpout(id, stateSpout);
-	 * }
-	 * public void setStateSpout(final String id, final IRichStateSpout stateSpout, final Number parallelism_hint) {
-	 * this.stormBuilder.setStateSpout(id, stateSpout, parallelism_hint);
-	 * }
-	 */
-
-	// for internal testing purpose only
-	StormTopology getStormTopology() {
-		return this.stormTopology;
-	}
-}