You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/09 15:03:54 UTC

[GitHub] zentol closed pull request #6880: [FLINK-10571][storm] Remove topology support

zentol closed pull request #6880: [FLINK-10571][storm] Remove topology support
URL: https://github.com/apache/flink/pull/6880
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/libs/storm_compatibility.md b/docs/dev/libs/storm_compatibility.md
index 7d7e6c8178c..8847c5bac5e 100644
--- a/docs/dev/libs/storm_compatibility.md
+++ b/docs/dev/libs/storm_compatibility.md
@@ -26,10 +26,7 @@ under the License.
 [Flink streaming]({{ site.baseurl }}/dev/datastream_api.html) is compatible with Apache Storm interfaces and therefore allows
 reusing code that was implemented for Storm.
 
-You can:
-
-- execute a whole Storm `Topology` in Flink.
-- use Storm `Spout`/`Bolt` as source/operator in Flink streaming programs.
+You can use Storm `Spout`/`Bolt` as source/operator in Flink streaming programs.
 
 This document shows how to use existing Storm code with Flink.
 
@@ -60,48 +57,9 @@ See *WordCount Storm* within `flink-storm-examples/pom.xml` for an example how t
 If you want to avoid large uber-jars, you can manually copy `storm-core-0.9.4.jar`, `json-simple-1.1.jar` and `flink-storm-{{site.version}}.jar` into Flink's `lib/` folder of each cluster node (*before* the cluster is started).
 For this case, it is sufficient to include only your own Spout and Bolt classes (and their internal dependencies) into the program jar.
 
-# Execute Storm Topologies
-
-Flink provides a Storm compatible API (`org.apache.flink.storm.api`) that offers replacements for the following classes:
-
-- `StormSubmitter` replaced by `FlinkSubmitter`
-- `NimbusClient` and `Client` replaced by `FlinkClient`
-- `LocalCluster` replaced by `FlinkLocalCluster`
-
-In order to submit a Storm topology to Flink, it is sufficient to replace the used Storm classes with their Flink replacements in the Storm *client code that assembles* the topology.
-The actual runtime code, ie, Spouts and Bolts, can be used *unmodified*.
-If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively.  If a parameter is not specified, the value is taken from `flink-conf.yaml`.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder
-
-// actual topology assembling code and used Spouts/Bolts can be used as-is
-builder.setSpout("source", new FileSpout(inputFilePath));
-builder.setBolt("tokenizer", new BoltTokenizer()).shuffleGrouping("source");
-builder.setBolt("counter", new BoltCounter()).fieldsGrouping("tokenizer", new Fields("word"));
-builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("counter");
-
-Config conf = new Config();
-if(runLocal) { // submit to test cluster
-	// replaces: LocalCluster cluster = new LocalCluster();
-	FlinkLocalCluster cluster = new FlinkLocalCluster();
-	cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
-} else { // submit to remote cluster
-	// optional
-	// conf.put(Config.NIMBUS_HOST, "remoteHost");
-	// conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
-	// replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
-	FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
-}
-{% endhighlight %}
-</div>
-</div>
-
 # Embed Storm Operators in Flink Streaming Programs
 
-As an alternative, Spouts and Bolts can be embedded into regular streaming programs.
+Spouts and Bolts can be embedded into regular streaming programs.
 The Storm compatibility layer offers a wrapper classes for each, namely `SpoutWrapper` and `BoltWrapper` (`org.apache.flink.storm.wrappers`).
 
 Per default, both wrappers convert Storm output tuples to Flink's [Tuple]({{site.baseurl}}/dev/api_concepts.html#tuples-and-case-classes) types (ie, `Tuple0` to `Tuple25` according to the number of fields of the Storm tuples).
@@ -182,9 +140,8 @@ See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/fli
 
 In Storm, Spouts and Bolts can be configured with a globally distributed `Map` object that is given to `submitTopology(...)` method of `LocalCluster` or `StormSubmitter`.
 This `Map` is provided by the user next to the topology and gets forwarded as a parameter to the calls `Spout.open(...)` and `Bolt.prepare(...)`.
-If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.
 
-For embedded usage, Flink's configuration mechanism must be used.
+To replicate this functionality Flink's configuration mechanism must be used.
 A global configuration can be set in a `StreamExecutionEnvironment` via `.getConfig().setGlobalJobParameters(...)`.
 Flink's regular `Configuration` class can be used to configure Spouts and Bolts.
 However, `Configuration` does not support arbitrary key data types as Storm does (only `String` keys are allowed).
@@ -211,9 +168,8 @@ env.getConfig().setGlobalJobParameters(config);
 ## Multiple Output Streams
 
 Flink can also handle the declaration of multiple output streams for Spouts and Bolts.
-If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.
 
-For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitStream.select(...)`.
+The output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitStream.select(...)`.
 Flink provides the predefined output selector `StormStreamSelector<T>` for `.split(...)` already.
 Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using `SplitStreamMapper<T>`.
 
@@ -282,7 +238,6 @@ To run the examples, you need to assemble a correct jar file.
 
 There are example jars for embedded Spout and Bolt, namely `WordCount-SpoutSource.jar` and `WordCount-BoltTokenizer.jar`, respectively.
 Compare `pom.xml` to see how both jars are built.
-Furthermore, there is one example for whole Storm topologies (`WordCount-StormTopology.jar`).
 
 You can run each of those examples via `bin/flink run <jarname>.jar`. The correct entry point class is contained in each jar's manifest file.
 
diff --git a/flink-contrib/flink-storm-examples/pom.xml b/flink-contrib/flink-storm-examples/pom.xml
index a7d597a3ddf..0447ab9f92a 100644
--- a/flink-contrib/flink-storm-examples/pom.xml
+++ b/flink-contrib/flink-storm-examples/pom.xml
@@ -125,16 +125,6 @@ under the License.
 						</goals>
 						<configuration>
 							<artifactItems>
-								<artifactItem>
-									<groupId>org.apache.flink</groupId>
-									<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
-									<version>${project.version}</version>
-									<type>jar</type>
-									<overWrite>false</overWrite>
-									<outputDirectory>${project.build.directory}/classes</outputDirectory>
-									<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class
-									</includes>
-								</artifactItem>
 								<artifactItem>
 									<groupId>org.apache.flink</groupId>
 									<artifactId>flink-storm_${scala.binary.version}</artifactId>
@@ -276,25 +266,6 @@ under the License.
 						</configuration>
 					</execution>
 
-					<!-- WordCount Storm topology-->
-					<!-- Example for whole topologies (ie, if FlinkTopology is used) -->
-					<!-- We cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar.
-					     However, we excluded 'defaults.yaml' in dependency-plugin to get clean Eclipse environment.
-					     Thus, 'defaults.yaml' is not available for maven-jar-plugin.
-					     Nevertheless, we register an empty jar with corresponding name, such that the final jar can be installed to local maven repository.
-					     We use maven-shade-plugin to build the actual jar (which will replace the empty jar). -->
-					<execution>
-						<id>WordCount-StormTopology</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<finalName>WordCount</finalName>
-							<classifier>StormTopology</classifier>
-						</configuration>
-					</execution>
-
 					<execution>
 						<goals>
 							<goal>test-jar</goal>
@@ -302,86 +273,6 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
-
-			<!-- WordCount Storm topology-->
-			<!-- Cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar -->
-			<!-- Build StormTopolgy jar to overwrite empty jar created with maven-jar-plugin. -->
-			<plugin>
-				<artifactId>maven-shade-plugin</artifactId>
-				<groupId>org.apache.maven.plugins</groupId>
-				<executions>
-					<execution>
-						<id>WordCount-StormTopology</id>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<finalName>WordCount-StormTopology</finalName>
-
-							<artifactSet>
-								<includes>
-									<include>org.apache.storm:storm-core</include>
-									<!-- Storm's recursive dependencies -->
-									<include>org.yaml:snakeyaml</include>
-									<include>com.googlecode.json-simple:json-simple</include>
-									<include>org.apache.flink:flink-storm_${scala.binary.version}</include>
-									<include>org.apache.flink:flink-storm-examples_${scala.binary.version}</include>
-								</includes>
-							</artifactSet>
-							<filters>
-								<filter>
-									<artifact>org.apache.storm:storm-core</artifact>
-									<includes>
-										<include>defaults.yaml</include>
-										<include>org/apache/storm/*.class</include>
-										<include>org/apache/storm/topology/*.class</include>
-										<include>org/apache/storm/spout/*.class</include>
-										<include>org/apache/storm/task/*.class</include>
-										<include>org/apache/storm/tuple/*.class</include>
-										<include>org/apache/storm/generated/*.class</include>
-										<include>org/apache/storm/metric/**/*.class</include>
-										<include>org/apache/storm/utils/*.class</include>
-										<include>org/apache/storm/serialization/*.class</include>
-										<include>org/apache/storm/curator/**/*.class</include>
-										<include>org/apache/storm/grouping/**/*.class</include>
-										<include>org/apache/storm/thrift/**/*.class</include>
-										<!-- Storm's recursive dependencies -->
-										<include>org/json/simple/**/*.class</include>
-										<include>org/yaml/snakeyaml/**/*.class</include>
-										<include>org/apache/storm/shade/**/*.class</include>
-									</includes>
-								</filter>
-								<filter>
-									<artifact>org.apache.flink:flink-storm-examples_${scala.binary.version}</artifact>
-									<includes>
-										<include>org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.class
-										</include>
-										<include>org/apache/flink/storm/wordcount/WordCountTopology.class</include>
-										<include>org/apache/flink/storm/wordcount/operators/*.class</include>
-										<include>org/apache/flink/storm/util/*.class</include>
-										<include>org/apache/flink/storm/wordcount/util/WordCountData.class</include>
-									</includes>
-								</filter>
-								<filter>
-									<artifact>org.apache.flink:flink-storm_${scala.binary.version}</artifact>
-									<includes>
-										<include>org/apache/flink/storm/api/*.class</include>
-										<include>org/apache/flink/storm/util/*.class</include>
-										<include>org/apache/flink/storm/wrappers/*.class</include>
-									</includes>
-								</filter>
-							</filters>
-							<transformers>
-								<transformer
-									implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-									<mainClass>org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter</mainClass>
-								</transformer>
-							</transformers>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
 		</plugins>
 
 		<pluginManagement>
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java
deleted file mode 100644
index 6108f791a9a..00000000000
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.exclamation;
-
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopology;
-import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
-
-import org.apache.storm.Config;
-import org.apache.storm.topology.TopologyBuilder;
-
-/**
- * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text files in a streaming
- * fashion. The program is constructed as a regular {@link org.apache.storm.generated.StormTopology} and submitted to
- * Flink for execution in the same way as to a Storm {@link org.apache.storm.LocalCluster}.
- *
- * <p>This example shows how to run program directly within Java, thus it cannot be used to submit a
- * {@link org.apache.storm.generated.StormTopology} via Flink command line clients (ie, bin/flink).
- *
- * <p>The input is a plain text file with lines separated by newline characters.
- *
- * <p>Usage: <code>ExclamationLocal &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}.
- *
- * <p>This example shows how to:
- * <ul>
- * <li>run a regular Storm program locally on Flink</li>
- * </ul>
- */
-public class ExclamationLocal {
-
-	public static final String TOPOLOGY_ID = "Streaming Exclamation";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!ExclamationTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final TopologyBuilder builder = ExclamationTopology.buildTopology();
-
-		// execute program locally
-		Config conf = new Config();
-		conf.put(ExclamationBolt.EXCLAMATION_COUNT, ExclamationTopology.getExclamation());
-		conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test
-
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		cluster.submitTopology(TOPOLOGY_ID, conf, FlinkTopology.createTopology(builder));
-		cluster.shutdown();
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java
deleted file mode 100644
index 9de1684dba1..00000000000
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.exclamation;
-
-import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
-import org.apache.flink.storm.util.BoltFileSink;
-import org.apache.flink.storm.util.BoltPrintSink;
-import org.apache.flink.storm.util.FiniteFileSpout;
-import org.apache.flink.storm.util.FiniteInMemorySpout;
-import org.apache.flink.storm.util.OutputFormatter;
-import org.apache.flink.storm.util.SimpleOutputFormatter;
-import org.apache.flink.storm.wordcount.util.WordCountData;
-
-import org.apache.storm.topology.TopologyBuilder;
-
-/**
- * Implements the "Exclamation" program that attaches two exclamation marks to every line of a text files in a streaming
- * fashion. The program is constructed as a regular {@link org.apache.storm.generated.StormTopology}.
- *
- * <p>The input is a plain text file with lines separated by newline characters.
- *
- * <p>Usage: <code>Exclamation[Local|RemoteByClient|RemoteBySubmitter] &lt;text path&gt;
- * &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- *
- * <p>This example shows how to:
- * <ul>
- * <li>construct a regular Storm topology as Flink program</li>
- * <li>make use of the FiniteSpout interface</li>
- * </ul>
- */
-public class ExclamationTopology {
-
-	private static final String spoutId = "source";
-	private static final String firstBoltId = "exclamation1";
-	private static final String secondBoltId = "exclamation2";
-	private static final String sinkId = "sink";
-	private static final OutputFormatter formatter = new SimpleOutputFormatter();
-
-	public static TopologyBuilder buildTopology() {
-		final TopologyBuilder builder = new TopologyBuilder();
-
-		// get input data
-		if (fileInputOutput) {
-			// read the text file from given input path
-			final String[] tokens = textPath.split(":");
-			final String inputFile = tokens[tokens.length - 1];
-			builder.setSpout(spoutId, new FiniteFileSpout(inputFile));
-		} else {
-			builder.setSpout(spoutId, new FiniteInMemorySpout(WordCountData.WORDS));
-		}
-
-		builder.setBolt(firstBoltId, new ExclamationBolt(), 3).shuffleGrouping(spoutId);
-		builder.setBolt(secondBoltId, new ExclamationBolt(), 2).shuffleGrouping(firstBoltId);
-
-		// emit result
-		if (fileInputOutput) {
-			// read the text file from given input path
-			final String[] tokens = outputPath.split(":");
-			final String outputFile = tokens[tokens.length - 1];
-			builder.setBolt(sinkId, new BoltFileSink(outputFile, formatter))
-			.shuffleGrouping(secondBoltId);
-		} else {
-			builder.setBolt(sinkId, new BoltPrintSink(formatter), 4)
-			.shuffleGrouping(secondBoltId);
-		}
-
-		return builder;
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileInputOutput = false;
-	private static String textPath;
-	private static String outputPath;
-	private static int exclamationNum = 3;
-
-	static int getExclamation() {
-		return exclamationNum;
-	}
-
-	static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileInputOutput = true;
-			if (args.length == 3) {
-				textPath = args[0];
-				outputPath = args[1];
-				exclamationNum = Integer.parseInt(args[2]);
-			} else {
-				System.err.println("Usage: StormExclamation* <text path> <result path>  <number of exclamation marks>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing StormExclamation example with built-in default data");
-			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: StormExclamation <text path> <result path> <number of exclamation marks>");
-		}
-
-		return true;
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
deleted file mode 100644
index b2ad05f1bb5..00000000000
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.join;
-
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopology;
-import org.apache.flink.storm.util.BoltFileSink;
-import org.apache.flink.storm.util.NullTerminatingSpout;
-import org.apache.flink.storm.util.TupleOutputFormatter;
-
-import org.apache.storm.Config;
-import org.apache.storm.starter.bolt.PrinterBolt;
-import org.apache.storm.starter.bolt.SingleJoinBolt;
-import org.apache.storm.testing.FeederSpout;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-/**
- * Implements a simple example where 2 input streams are being joined.
- */
-public class SingleJoinExample {
-
-	public static void main(String[] args) throws Exception {
-		final FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender", "hobbies"));
-		final FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
-
-		Config conf = new Config();
-		TopologyBuilder builder = new TopologyBuilder();
-
-		//  only required to stabilize integration test
-		conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true);
-		final NullTerminatingSpout finalGenderSpout = new NullTerminatingSpout(genderSpout);
-		final NullTerminatingSpout finalAgeSpout  = new NullTerminatingSpout(ageSpout);
-
-		builder.setSpout("gender", finalGenderSpout);
-		builder.setSpout("age", finalAgeSpout);
-		builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age")))
-			.fieldsGrouping("gender", new Fields("id"))
-			.fieldsGrouping("age", new Fields("id"));
-
-		// emit result
-		if (args.length > 0) {
-			// read the text file from given input path
-			builder.setBolt("fileOutput", new BoltFileSink(args[0], new TupleOutputFormatter()))
-				.shuffleGrouping("join");
-		} else {
-			builder.setBolt("print", new PrinterBolt()).shuffleGrouping("join");
-		}
-
-		String[] hobbies = new String[] {"reading", "biking", "travelling", "watching tv"};
-
-		for (int i = 0; i < 10; i++) {
-			String gender;
-			if (i % 2 == 0) {
-				gender = "male";
-			}
-			else {
-				gender = "female";
-			}
-			genderSpout.feed(new Values(i, gender, hobbies[i % hobbies.length]));
-		}
-
-		for (int i = 9; i >= 0; i--) {
-			ageSpout.feed(new Values(i, i + 20));
-		}
-
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		cluster.submitTopology("joinTopology", conf, FlinkTopology.createTopology(builder));
-		cluster.shutdown();
-	}
-}
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
deleted file mode 100644
index 6157e2cf495..00000000000
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.print;
-
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopology;
-
-import org.apache.storm.Config;
-import org.apache.storm.starter.bolt.PrinterBolt;
-import org.apache.storm.starter.spout.TwitterSampleSpout;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-
-import java.util.Arrays;
-
-/**
- * Prints incoming tweets. Tweets can be filtered by keywords.
- */
-public class PrintSampleStream {
-
-	public static void main(String[] args) throws Exception {
-
-		if (args.length < 4) {
-			System.err.println(
-					"Usage: PrintSampleStream <consumer-key> <consumer-secret> <access-token> <access-token-secret>");
-			return;
-		}
-
-		String consumerKey = args[0];
-		String consumerSecret = args[1];
-		String accessToken = args[2];
-		String accessTokenSecret = args[3];
-
-		// keywords start with the 5th parameter
-		String[] keyWords = Arrays.copyOfRange(args, 4, args.length);
-
-		TopologyBuilder builder = new TopologyBuilder();
-
-		builder.setSpout("twitter", new TwitterSampleSpout(consumerKey, consumerSecret,
-								accessToken, accessTokenSecret, keyWords));
-		builder.setBolt("print", new PrinterBolt())
-				.shuffleGrouping("twitter");
-
-		Config conf = new Config();
-
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		cluster.submitTopology("Print", conf, FlinkTopology.createTopology(builder));
-
-		Utils.sleep(10 * 1000);
-
-		cluster.shutdown();
-	}
-}
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
deleted file mode 100644
index 71983274f4d..00000000000
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.wordcount;
-
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopology;
-import org.apache.flink.storm.wordcount.util.WordCountData;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
- * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
- * same way as to a Storm {@link LocalCluster}.
- *
- * <p>This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology}
- * via Flink command line clients (ie, bin/flink).
- *
- * <p>The input is a plain text file with lines separated by newline characters.
- *
- * <p>Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- *
- * <p>This example shows how to:
- * <ul>
- * <li>run a regular Storm program locally on Flink</li>
- * </ul>
- */
-public class WordCountLocal {
-	private static final String topologyId = "Storm WordCount";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!WordCountTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final TopologyBuilder builder = WordCountTopology.buildTopology();
-
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		Config conf = new Config();
-		conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test
-		cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));
-		cluster.shutdown();
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
deleted file mode 100644
index dcad567e4b7..00000000000
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.wordcount;
-
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopology;
-import org.apache.flink.storm.wordcount.util.WordCountData;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
- * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
- * same way as to a Storm {@link LocalCluster}. In contrast to {@link WordCountLocal} all bolts access the field of
- * input tuples by name instead of index.
- *
- * <p>This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology}
- * via Flink command line clients (ie, bin/flink).
- *
- * <p>The input is a plain text file with lines separated by newline characters.
- *
- * <p>Usage: <code>WordCountLocalByName &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- *
- * <p>This example shows how to:
- * <ul>
- * <li>run a regular Storm program locally on Flink
- * </ul>
- */
-public class WordCountLocalByName {
-	private static final String topologyId = "Storm WordCountName";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!WordCountTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final TopologyBuilder builder = WordCountTopology.buildTopology(false);
-
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		Config conf = new Config();
-		conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test
-		cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));
-		cluster.shutdown();
-	}
-}
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
deleted file mode 100644
index 9ad0d633e04..00000000000
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.wordcount;
-
-import org.apache.flink.storm.api.FlinkClient;
-import org.apache.flink.storm.api.FlinkTopology;
-import org.apache.flink.storm.wordcount.util.WordCountData;
-
-import org.apache.storm.Config;
-import org.apache.storm.generated.AlreadyAliveException;
-import org.apache.storm.generated.InvalidTopologyException;
-import org.apache.storm.generated.NotAliveException;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.NimbusClient;
-import org.apache.storm.utils.Utils;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
- * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
- * same way as to a Storm cluster similar to {@link NimbusClient}. The Flink cluster can be local or remote.
- *
- * <p>This example shows how to submit the program via Java, thus it cannot be used to submit a {@link StormTopology} via
- * Flink command line clients (ie, bin/flink).
- *
- * <p>The input is a plain text file with lines separated by newline characters.
- *
- * <p>Usage: <code>WordCountRemoteByClient &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- *
- * <p>This example shows how to:
- * <ul>
- * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
- * </ul>
- */
-public class WordCountRemoteByClient {
-	private static final String topologyId = "Storm WordCount";
-	private static final String uploadedJarLocation = "WordCount-StormTopology.jar";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws AlreadyAliveException, InvalidTopologyException,
-	NotAliveException {
-
-		if (!WordCountTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final TopologyBuilder builder = WordCountTopology.buildTopology();
-
-		// execute program on Flink cluster
-		final Config conf = new Config();
-		// can be changed to remote address
-		conf.put(Config.NIMBUS_HOST, "localhost");
-		// use default flink jobmanger.rpc.port
-		conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
-
-		final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);
-		cluster.submitTopology(topologyId, uploadedJarLocation, FlinkTopology.createTopology(builder));
-
-		Utils.sleep(5 * 1000);
-
-		cluster.killTopology(topologyId);
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
deleted file mode 100644
index 2d1a0b5f610..00000000000
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.wordcount;
-
-import org.apache.flink.storm.api.FlinkClient;
-import org.apache.flink.storm.api.FlinkSubmitter;
-import org.apache.flink.storm.api.FlinkTopology;
-import org.apache.flink.storm.wordcount.util.WordCountData;
-
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
- * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
- * same way as to a Storm cluster similar to {@link StormSubmitter}. The Flink cluster can be local or remote.
- *
- * <p>This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink).
- *
- * <p>The input is a plain text file with lines separated by newline characters.
- *
- * <p>Usage: <code>WordCountRemoteBySubmitter &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- *
- * <p>This example shows how to:
- * <ul>
- * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
- * </ul>
- */
-public class WordCountRemoteBySubmitter {
-	private static final String topologyId = "Storm WordCount";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!WordCountTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final TopologyBuilder builder = WordCountTopology.buildTopology();
-
-		// execute program on Flink cluster
-		final Config conf = new Config();
-		// We can set Jobmanager host/port values manually or leave them blank
-		// if not set and
-		// - executed within Java, default values "localhost" and "6123" are set by FlinkSubmitter
-		// - executed via bin/flink values from flink-conf.yaml are set by FlinkSubmitter.
-		// conf.put(Config.NIMBUS_HOST, "localhost");
-		// conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123));
-
-		// The user jar file must be specified via JVM argument if executed via Java.
-		// => -Dstorm.jar=target/WordCount-StormTopology.jar
-		// If bin/flink is used, the jar file is detected automatically.
-		FlinkSubmitter.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));
-
-		Thread.sleep(5 * 1000);
-
-		FlinkClient.getConfiguredClient(conf).killTopology(topologyId);
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
deleted file mode 100644
index 3ae7c5ce561..00000000000
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.wordcount;
-
-import org.apache.flink.storm.util.BoltFileSink;
-import org.apache.flink.storm.util.BoltPrintSink;
-import org.apache.flink.storm.util.NullTerminatingSpout;
-import org.apache.flink.storm.util.OutputFormatter;
-import org.apache.flink.storm.util.TupleOutputFormatter;
-import org.apache.flink.storm.wordcount.operators.BoltCounter;
-import org.apache.flink.storm.wordcount.operators.BoltCounterByName;
-import org.apache.flink.storm.wordcount.operators.BoltTokenizer;
-import org.apache.flink.storm.wordcount.operators.BoltTokenizerByName;
-import org.apache.flink.storm.wordcount.operators.WordCountFileSpout;
-import org.apache.flink.storm.wordcount.operators.WordCountInMemorySpout;
-import org.apache.flink.storm.wordcount.util.WordCountData;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
- * fashion. The program is constructed as a regular {@link StormTopology}.
- *
- * <p>The input is a plain text file with lines separated by newline characters.
- *
- * <p>Usage:
- * <code>WordCount[Local|LocalByName|RemoteByClient|RemoteBySubmitter] &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- *
- * <p>This example shows how to:
- * <ul>
- * <li>how to construct a regular Storm topology as Flink program</li>
- * </ul>
- */
-public class WordCountTopology {
-	private static final String spoutId = "source";
-	private static final String tokenierzerId = "tokenizer";
-	private static final String counterId = "counter";
-	private static final String sinkId = "sink";
-	private static final OutputFormatter formatter = new TupleOutputFormatter();
-
-	public static TopologyBuilder buildTopology() {
-		return buildTopology(true);
-	}
-
-	public static TopologyBuilder buildTopology(boolean indexOrName) {
-
-		final TopologyBuilder builder = new TopologyBuilder();
-
-		// get input data
-		if (fileInputOutput) {
-			// read the text file from given input path
-			final String[] tokens = textPath.split(":");
-			final String inputFile = tokens[tokens.length - 1];
-			// inserting NullTerminatingSpout only required to stabilize integration test
-			builder.setSpout(spoutId, new NullTerminatingSpout(new WordCountFileSpout(inputFile)));
-		} else {
-			builder.setSpout(spoutId, new WordCountInMemorySpout());
-		}
-
-		if (indexOrName) {
-			// split up the lines in pairs (2-tuples) containing: (word,1)
-			builder.setBolt(tokenierzerId, new BoltTokenizer(), 4).shuffleGrouping(spoutId);
-			// group by the tuple field "0" and sum up tuple field "1"
-			builder.setBolt(counterId, new BoltCounter(), 4).fieldsGrouping(tokenierzerId,
-					new Fields(BoltTokenizer.ATTRIBUTE_WORD));
-		} else {
-			// split up the lines in pairs (2-tuples) containing: (word,1)
-			builder.setBolt(tokenierzerId, new BoltTokenizerByName(), 4).shuffleGrouping(
-					spoutId);
-			// group by the tuple field "0" and sum up tuple field "1"
-			builder.setBolt(counterId, new BoltCounterByName(), 4).fieldsGrouping(
-					tokenierzerId, new Fields(BoltTokenizerByName.ATTRIBUTE_WORD));
-		}
-
-		// emit result
-		if (fileInputOutput) {
-			// read the text file from given input path
-			final String[] tokens = outputPath.split(":");
-			final String outputFile = tokens[tokens.length - 1];
-			builder.setBolt(sinkId, new BoltFileSink(outputFile, formatter)).shuffleGrouping(counterId);
-		} else {
-			builder.setBolt(sinkId, new BoltPrintSink(formatter), 4).shuffleGrouping(counterId);
-		}
-
-		return builder;
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileInputOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileInputOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: WordCount* <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WordCount example with built-in default data");
-			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: WordCount* <text path> <result path>");
-		}
-
-		return true;
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
deleted file mode 100644
index c82da37c893..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.exclamation;
-
-import org.apache.flink.storm.exclamation.util.ExclamationData;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.junit.Test;
-
-/**
- * Test for the ExclamationLocal example.
- */
-public class StormExclamationLocalITCase extends AbstractTestBase {
-
-	@Test
-	public void testProgram() throws Exception {
-		String textPath = createTempFile("text.txt", WordCountData.TEXT);
-		String resultPath = getTempDirPath("result");
-		String exclamationNum = "3";
-
-		ExclamationLocal.main(new String[]{textPath, resultPath, exclamationNum});
-
-		compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, resultPath);
-	}
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java
deleted file mode 100644
index c00c1542e3e..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.join;
-
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
-
-import org.junit.Test;
-
-/**
- * Test for the SingleJoin example.
- */
-public class SingleJoinITCase extends AbstractTestBase {
-
-	protected static String[] expectedOutput = {
-			"(male,20)",
-			"(female,21)",
-			"(male,22)",
-			"(female,23)",
-			"(male,24)",
-			"(female,25)",
-			"(male,26)",
-			"(female,27)",
-			"(male,28)",
-			"(female,29)"
-	};
-
-	@Test
-	public void testProgram() throws Exception {
-		String resultPath = getTempDirPath("result");
-		// We need to remove the file scheme because we can't use the Flink file system.
-		// (to remain compatible with Storm)
-		SingleJoinExample.main(new String[]{resultPath.replace("file:", "")});
-
-		compareResultsByLinesInMemory(Joiner.on("\n").join(expectedOutput), resultPath);
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java
deleted file mode 100644
index 90ee795317f..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.split;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
-/**
- * A bolt for splitting an input stream containing numbers based on whether they are even or odd.
- */
-public class SplitBolt extends BaseRichBolt {
-	private static final long serialVersionUID = -6627606934204267173L;
-
-	public static final String EVEN_STREAM = "even";
-	public static final String ODD_STREAM = "odd";
-
-	private OutputCollector collector;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-		this.collector = collector;
-
-	}
-
-	@Override
-	public void execute(Tuple input) {
-		if (input.getInteger(0) % 2 == 0) {
-			this.collector.emit(EVEN_STREAM, new Values(input.getInteger(0)));
-		} else {
-			this.collector.emit(ODD_STREAM, new Values(input.getInteger(0)));
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		Fields schema = new Fields("number");
-		declarer.declareStream(EVEN_STREAM, schema);
-		declarer.declareStream(ODD_STREAM, schema);
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
deleted file mode 100644
index c00284035db..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.split;
-
-import org.apache.flink.storm.split.operators.RandomSpout;
-import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
-import org.apache.flink.storm.util.BoltFileSink;
-import org.apache.flink.storm.util.BoltPrintSink;
-import org.apache.flink.storm.util.OutputFormatter;
-import org.apache.flink.storm.util.TupleOutputFormatter;
-
-import org.apache.storm.topology.TopologyBuilder;
-
-/**
- * A simple topology that splits a stream of numbers based on their parity, and verifies the result.
- */
-public class SplitBoltTopology {
-	private static final String spoutId = "randomSource";
-	private static final String boltId = "splitBolt";
-	private static final String evenVerifierId = "evenVerifier";
-	private static final String oddVerifierId = "oddVerifier";
-	private static final String sinkId = "sink";
-	private static final OutputFormatter formatter = new TupleOutputFormatter();
-
-	public static TopologyBuilder buildTopology() {
-		final TopologyBuilder builder = new TopologyBuilder();
-
-		builder.setSpout(spoutId, new RandomSpout(false, seed));
-		builder.setBolt(boltId, new SplitBolt()).shuffleGrouping(spoutId);
-		builder.setBolt(evenVerifierId, new VerifyAndEnrichBolt(true)).shuffleGrouping(boltId,
-				SplitBolt.EVEN_STREAM);
-		builder.setBolt(oddVerifierId, new VerifyAndEnrichBolt(false)).shuffleGrouping(boltId,
-				SplitBolt.ODD_STREAM);
-
-		// emit result
-		if (outputPath != null) {
-			// read the text file from given input path
-			final String[] tokens = outputPath.split(":");
-			final String outputFile = tokens[tokens.length - 1];
-			builder.setBolt(sinkId, new BoltFileSink(outputFile, formatter))
-				.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
-		} else {
-			builder.setBolt(sinkId, new BoltPrintSink(formatter), 4)
-				.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
-		}
-
-		return builder;
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static long seed = System.currentTimeMillis();
-	private static String outputPath = null;
-
-	static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			if (args.length == 2) {
-				seed = Long.parseLong(args[0]);
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: SplitStreamBoltLocal <seed> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing SplitBoltTopology example with random data");
-			System.out.println("  Usage: SplitStreamBoltLocal <seed> <result path>");
-		}
-
-		return true;
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
index 7152cf2e6cc..944000ca369 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
@@ -53,16 +53,4 @@ public void testEmbeddedSpout() throws Exception {
 		Assert.assertFalse(Enrich.errorOccured);
 	}
 
-	@Test
-	public void testSpoutSplitTopology() throws Exception {
-		SplitStreamSpoutLocal.main(new String[] { "0", output });
-		Assert.assertFalse(VerifyAndEnrichBolt.errorOccured);
-	}
-
-	@Test
-	public void testBoltSplitTopology() throws Exception {
-		SplitStreamBoltLocal.main(new String[] { "0", output });
-		Assert.assertFalse(VerifyAndEnrichBolt.errorOccured);
-	}
-
 }
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
deleted file mode 100644
index aa92a959995..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.split;
-
-import org.apache.flink.storm.split.operators.RandomSpout;
-import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
-import org.apache.flink.storm.util.BoltFileSink;
-import org.apache.flink.storm.util.BoltPrintSink;
-import org.apache.flink.storm.util.OutputFormatter;
-import org.apache.flink.storm.util.TupleOutputFormatter;
-
-import org.apache.storm.topology.TopologyBuilder;
-
-/**
- * A simple topology similar to the {@link SplitBoltTopology}, except that the split streams are generated directly in
- * a spout.
- */
-public class SplitSpoutTopology {
-	private static final String spoutId = "randomSplitSource";
-	private static final String evenVerifierId = "evenVerifier";
-	private static final String oddVerifierId = "oddVerifier";
-	private static final String sinkId = "sink";
-	private static final OutputFormatter formatter = new TupleOutputFormatter();
-
-	public static TopologyBuilder buildTopology() {
-		final TopologyBuilder builder = new TopologyBuilder();
-
-		builder.setSpout(spoutId, new RandomSpout(true, seed));
-		builder.setBolt(evenVerifierId, new VerifyAndEnrichBolt(true)).shuffleGrouping(spoutId,
-				RandomSpout.EVEN_STREAM);
-		builder.setBolt(oddVerifierId, new VerifyAndEnrichBolt(false)).shuffleGrouping(spoutId,
-				RandomSpout.ODD_STREAM);
-
-		// emit result
-		if (outputPath != null) {
-			// read the text file from given input path
-			final String[] tokens = outputPath.split(":");
-			final String outputFile = tokens[tokens.length - 1];
-			builder.setBolt(sinkId, new BoltFileSink(outputFile, formatter))
-				.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
-		} else {
-			builder.setBolt(sinkId, new BoltPrintSink(formatter), 4)
-				.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
-		}
-
-		return builder;
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static long seed = System.currentTimeMillis();
-	private static String outputPath = null;
-
-	static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			if (args.length == 2) {
-				seed = Long.parseLong(args[0]);
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: SplitStreamSpoutLocal <seed> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing SplitSpoutTopology example with random data");
-			System.out.println("  Usage: SplitStreamSpoutLocal <seed> <result path>");
-		}
-
-		return true;
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
deleted file mode 100644
index 55c3bd39fb7..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.split;
-
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopology;
-
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-
-/**
- * An example using the {@link SplitBoltTopology}.
- */
-public class SplitStreamBoltLocal {
-	private static final String topologyId = "Bolt split stream example";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!SplitBoltTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final TopologyBuilder builder = SplitBoltTopology.buildTopology();
-
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder));
-
-		// run topology for 5 seconds
-		Utils.sleep(5 * 1000);
-
-		cluster.shutdown();
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
deleted file mode 100644
index da6e574131a..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.split;
-
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopology;
-
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-
-/**
- * An example using the {@link SplitSpoutTopology}.
- */
-public class SplitStreamSpoutLocal {
-	private static final String topologyId = "Spout split stream example";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!SplitSpoutTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final TopologyBuilder builder = SplitSpoutTopology.buildTopology();
-
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder));
-
-		// run topology for 5 seconds
-		Utils.sleep(5 * 1000);
-
-		cluster.shutdown();
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
deleted file mode 100644
index 69059537875..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.tests;
-
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopology;
-import org.apache.flink.storm.tests.operators.FiniteRandomSpout;
-import org.apache.flink.storm.tests.operators.TaskIdBolt;
-import org.apache.flink.storm.util.BoltFileSink;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.util.MathUtils;
-
-import org.apache.storm.Config;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * This test relies on the hash function used by the {@link DataStream#keyBy}, which is
- * assumed to be {@link MathUtils#murmurHash}.
- */
-public class StormFieldsGroupingITCase extends AbstractTestBase {
-
-	private static final String topologyId = "FieldsGrouping Test";
-	private static final String spoutId = "spout";
-	private static final String boltId = "bolt";
-	private static final String sinkId = "sink";
-
-	@Test
-	public void testProgram() throws Exception {
-		String resultPath = this.getTempDirPath("result");
-
-		final String[] tokens = resultPath.split(":");
-		final String outputFile = tokens[tokens.length - 1];
-
-		final TopologyBuilder builder = new TopologyBuilder();
-
-		builder.setSpout(spoutId, new FiniteRandomSpout(0, 10, 2));
-		builder.setBolt(boltId, new TaskIdBolt(), 2).fieldsGrouping(
-				spoutId, FiniteRandomSpout.STREAM_PREFIX + 0, new Fields("number"));
-		builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId);
-
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		Config conf = new Config();
-		conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test
-		cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));
-		cluster.shutdown();
-
-		List<String> expectedResults = Arrays.asList(
-			"-1155484576", "1033096058", "-1930858313", "1431162155", "-1557280266", "-1728529858", "1654374947",
-			"-65105105", "-518907128", "-252332814");
-
-		List<String> actualResults = new ArrayList<>();
-		readAllResultLines(actualResults, resultPath, new String[0], false);
-
-		//remove potential operator id prefix
-		for (int i = 0; i < actualResults.size(); ++i) {
-			String s = actualResults.get(i);
-			if (s.contains(">")) {
-				s = s.substring(s.indexOf(">") + 2);
-				actualResults.set(i, s);
-			}
-		}
-
-		Assert.assertEquals(expectedResults.size(), actualResults.size());
-		Collections.sort(actualResults);
-		Collections.sort(expectedResults);
-		System.out.println(actualResults);
-		for (int i = 0; i < actualResults.size(); ++i) {
-			//compare against actual results with removed prefix (as it depends e.g. on the hash function used)
-			Assert.assertEquals(expectedResults.get(i), actualResults.get(i));
-		}
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java
deleted file mode 100644
index c24a95e30bb..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.tests;
-
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopology;
-import org.apache.flink.storm.tests.operators.MetaDataSpout;
-import org.apache.flink.storm.tests.operators.VerifyMetaDataBolt;
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for meta data spouts/bolts.
- */
-public class StormMetaDataITCase extends AbstractTestBase {
-
-	private static final String topologyId = "FieldsGrouping Test";
-	private static final String spoutId = "spout";
-	private static final String boltId1 = "bolt1";
-	private static final String boltId2 = "bolt2";
-
-	@Test
-	public void testProgram() throws Exception {
-		final TopologyBuilder builder = new TopologyBuilder();
-
-		builder.setSpout(spoutId, new MetaDataSpout(), 2);
-		builder.setBolt(boltId1, new VerifyMetaDataBolt(), 2).localOrShuffleGrouping(spoutId,
-				MetaDataSpout.STREAM_ID);
-		builder.setBolt(boltId2, new VerifyMetaDataBolt()).shuffleGrouping(boltId1,
-				VerifyMetaDataBolt.STREAM_ID);
-
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder));
-
-		// run topology for 5 seconds
-		Utils.sleep(5 * 1000);
-
-		cluster.shutdown();
-
-		Assert.assertFalse(VerifyMetaDataBolt.errorOccured);
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java
deleted file mode 100644
index 6f6e47fab0f..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.tests;
-
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopology;
-import org.apache.flink.storm.tests.operators.FiniteRandomSpout;
-import org.apache.flink.storm.tests.operators.MergerBolt;
-import org.apache.flink.storm.util.BoltFileSink;
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.apache.storm.Config;
-import org.apache.storm.topology.TopologyBuilder;
-import org.junit.Test;
-
-/**
- * Test for the {@link MergerBolt}.
- */
-public class StormUnionITCase extends AbstractTestBase {
-
-	private static final String RESULT = "-1154715079\n" + "-1155869325\n" + "-1155484576\n"
-			+ "431529176\n" + "1260042744\n" + "1761283695\n" + "1749940626\n" + "892128508\n"
-			+ "155629808\n" + "1429008869\n" + "-1465154083\n" + "-723955400\n" + "-423279216\n"
-			+ "17850135\n" + "2133836778\n" + "1033096058\n" + "-1690734402\n" + "-1557280266\n"
-			+ "1327362106\n" + "-1930858313\n" + "502539523\n" + "-1728529858\n" + "-938301587\n"
-			+ "-624140595\n" + "-60658084\n" + "142959438\n" + "-613647601\n" + "-330177159\n"
-			+ "-54027108\n" + "1945002173\n" + "979930868";
-
-	private static final String topologyId = "Multiple Input Streams Test";
-	private static final String spoutId1 = "spout1";
-	private static final String spoutId2 = "spout2";
-	private static final String spoutId3 = "spout3";
-	private static final String boltId = "merger";
-	private static final String sinkId = "sink";
-
-	@Test
-	public void testProgram() throws Exception {
-		String resultPath = this.getTempDirPath("result");
-
-		final TopologyBuilder builder = new TopologyBuilder();
-
-		// get input data
-		builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
-		builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
-		builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
-
-		builder.setBolt(boltId, new MergerBolt())
-				.shuffleGrouping(spoutId1, FiniteRandomSpout.STREAM_PREFIX + 0)
-				.shuffleGrouping(spoutId2, FiniteRandomSpout.STREAM_PREFIX + 0)
-				.shuffleGrouping(spoutId3, FiniteRandomSpout.STREAM_PREFIX + 0);
-
-		final String[] tokens = resultPath.split(":");
-		final String outputFile = tokens[tokens.length - 1];
-		builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId);
-
-		// execute program locally
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		Config conf = new Config();
-		conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test
-		cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));
-		cluster.shutdown();
-
-		compareResultsByLinesInMemory(RESULT, resultPath);
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/FiniteRandomSpout.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/FiniteRandomSpout.java
deleted file mode 100644
index 923f9dff210..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/FiniteRandomSpout.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.tests.operators;
-
-import org.apache.flink.storm.util.FiniteSpout;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-import java.util.Random;
-
-/**
- * A Spout implementation that broadcasts random numbers across a specified number of output streams, until a specified
- * count is reached.
- */
-public class FiniteRandomSpout extends BaseRichSpout implements FiniteSpout {
-	private static final long serialVersionUID = 6592885571932363239L;
-
-	public static final String STREAM_PREFIX = "stream_";
-
-	private final Random r;
-	private SpoutOutputCollector collector;
-	private int counter;
-	private final String[] outputStreams;
-
-	public FiniteRandomSpout(long seed, int counter, int numberOfOutputStreams) {
-		this.r = new Random(seed);
-		this.counter = counter;
-		if (numberOfOutputStreams < 1) {
-			this.outputStreams = new String[] { Utils.DEFAULT_STREAM_ID };
-		} else {
-			this.outputStreams = new String[numberOfOutputStreams];
-			for (int i = 0; i < this.outputStreams.length; ++i) {
-				this.outputStreams[i] = STREAM_PREFIX + i;
-			}
-		}
-	}
-
-	public FiniteRandomSpout(long seed, int counter) {
-		this(seed, counter, 1);
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-		this.collector = collector;
-	}
-
-	@Override
-	public void nextTuple() {
-		for (String s : this.outputStreams) {
-			this.collector.emit(s, new Values(this.r.nextInt()));
-		}
-		--this.counter;
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		for (String s : this.outputStreams) {
-			declarer.declareStream(s, new Fields("number"));
-		}
-	}
-
-	@Override
-	public boolean reachedEnd() {
-		return this.counter <= 0;
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MergerBolt.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MergerBolt.java
deleted file mode 100644
index c04bce308a1..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MergerBolt.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.tests.operators;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-
-import java.util.Map;
-
-/**
- * A Bolt implementation that forwards all incoming tuples to a single output stream.
- */
-public class MergerBolt extends BaseRichBolt {
-	private static final long serialVersionUID = -7966475984592762720L;
-
-	private OutputCollector collector;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-		this.collector = collector;
-	}
-
-	@Override
-	public void execute(Tuple input) {
-		this.collector.emit(input.getValues());
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("number"));
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MetaDataSpout.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MetaDataSpout.java
deleted file mode 100644
index 4061a711628..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/MetaDataSpout.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.tests.operators;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
-/**
- * A Spout implementation emitting meta data.
- */
-public class MetaDataSpout extends BaseRichSpout {
-	private static final long serialVersionUID = 5305870218033256376L;
-
-	public static final String STREAM_ID = "spoutMeta";
-
-	private SpoutOutputCollector collector;
-	private TopologyContext context;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-		this.collector = collector;
-		this.context = context;
-	}
-
-	@Override
-	public void nextTuple() {
-		this.collector.emit(STREAM_ID, new Values(this.context.getThisComponentId(), STREAM_ID,
-				this.context.getThisTaskId()));
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declareStream(STREAM_ID, new Fields("id", "sid", "tid"));
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java
deleted file mode 100644
index 1a8f218dbf0..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.tests.operators;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
-/**
- * Bolt to prepend all incoming tuple values with the task id.
- */
-public class TaskIdBolt extends BaseRichBolt {
-	private static final long serialVersionUID = -7966475984592762720L;
-
-	private OutputCollector collector;
-	private int thisTaskId;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-		this.collector = collector;
-		this.thisTaskId = context.getThisTaskId();
-	}
-
-	@Override
-	public void execute(Tuple input) {
-		this.collector.emit(new Values(this.thisTaskId + "> " + input.getValue(0)));
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("number"));
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/VerifyMetaDataBolt.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/VerifyMetaDataBolt.java
deleted file mode 100644
index 059882bdd09..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/VerifyMetaDataBolt.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.tests.operators;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.MessageId;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
-/**
- * A Bolt implementation that verifies metadata emitted by a {@link MetaDataSpout}.
- */
-public class VerifyMetaDataBolt extends BaseRichBolt {
-	private static final long serialVersionUID = 1353222852073800478L;
-
-	public static final String STREAM_ID = "boltMeta";
-
-	private OutputCollector collector;
-	private TopologyContext context;
-
-	public static boolean errorOccured = false;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-		this.collector = collector;
-		this.context = context;
-	}
-
-	@Override
-	public void execute(Tuple input) {
-		if (!input.getSourceComponent().equals(input.getString(0))
-				|| !input.getSourceStreamId().equals(input.getString(1))
-				|| !input.getSourceGlobalStreamid().get_componentId().equals(input.getString(0))
-				|| !input.getSourceGlobalStreamid().get_streamId().equals(input.getString(1))
-				|| input.getSourceTask() != input.getInteger(2).intValue()
-				|| !input.getMessageId().equals(MessageId.makeUnanchored())
-				|| input.getMessageId().getAnchors().size() != 0
-				|| input.getMessageId().getAnchorsToIds().size() != 0) {
-			errorOccured = true;
-		}
-		this.collector.emit(STREAM_ID, new Values(this.context.getThisComponentId(), STREAM_ID,
-				this.context.getThisTaskId()));
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declareStream(STREAM_ID, new Fields("id", "sid", "tid"));
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
deleted file mode 100644
index 16844e54785..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.wordcount;
-
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.junit.Test;
-
-/**
- * Test for the WordCountLocal example.
- */
-public class WordCountLocalITCase extends AbstractTestBase {
-
-	@Test
-	public void testProgram() throws Exception {
-		String textPath = createTempFile("text.txt", WordCountData.TEXT);
-		String resultPath = getTempDirPath("result");
-
-		WordCountLocal.main(new String[]{textPath, resultPath});
-
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
-	}
-
-}
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
deleted file mode 100644
index 0353c2c56b0..00000000000
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.wordcount;
-
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.junit.Test;
-
-/**
- * Test for the WordCountLocalByName example.
- */
-public class WordCountLocalNamedITCase extends AbstractTestBase {
-
-	@Test
-	public void testProgram() throws Exception {
-		String textPath = createTempFile("text.txt", WordCountData.TEXT);
-		String resultPath = getTempDirPath("result");
-
-		WordCountLocalByName.main(new String[]{textPath, resultPath});
-
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
-	}
-
-}
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
deleted file mode 100644
index f4ea659dcee..00000000000
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.api;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
-import org.apache.flink.storm.util.StormConfig;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import com.esotericsoftware.kryo.Serializer;
-import org.apache.storm.Config;
-import org.apache.storm.generated.AlreadyAliveException;
-import org.apache.storm.generated.InvalidTopologyException;
-import org.apache.storm.generated.KillOptions;
-import org.apache.storm.generated.NotAliveException;
-import org.apache.storm.utils.NimbusClient;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import scala.Some;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link Nimbus}{@code .Client} at once, to interact with
- * Flink's JobManager instead of Storm's Nimbus.
- */
-public class FlinkClient {
-
-	/** The log used by this client. */
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkClient.class);
-
-	/** The client's configuration. */
-	private final Map<?, ?> conf;
-	/** The jobmanager's host name. */
-	private final String jobManagerHost;
-	/** The jobmanager's rpc port. */
-	private final int jobManagerPort;
-	/** The user specified timeout in milliseconds. */
-	private final String timeout;
-
-	// The following methods are derived from "backtype.storm.utils.NimbusClient"
-
-	/**
-	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
-	 * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
-	 *
-	 * @param conf
-	 * 		A configuration.
-	 * @param host
-	 * 		The jobmanager's host name.
-	 * @param port
-	 * 		The jobmanager's rpc port.
-	 */
-	@SuppressWarnings("rawtypes")
-	public FlinkClient(final Map conf, final String host, final int port) {
-		this(conf, host, port, null);
-	}
-
-	/**
-	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
-	 * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
-	 *
-	 * @param conf
-	 * 		A configuration.
-	 * @param host
-	 * 		The jobmanager's host name.
-	 * @param port
-	 * 		The jobmanager's rpc port.
-	 * @param timeout
-	 * 		Timeout
-	 */
-	@SuppressWarnings("rawtypes")
-	public FlinkClient(final Map conf, final String host, final int port, final Integer timeout) {
-		this.conf = conf;
-		this.jobManagerHost = host;
-		this.jobManagerPort = port;
-		if (timeout != null) {
-			this.timeout = timeout + " ms";
-		} else {
-			this.timeout = null;
-		}
-	}
-
-	/**
-	 * Returns a {@link FlinkClient} that uses the configured {@link Config#NIMBUS_HOST} and {@link
-	 * Config#NIMBUS_THRIFT_PORT} as JobManager address.
-	 *
-	 * @param conf
-	 * 		Configuration that contains the jobmanager's hostname and port.
-	 * @return A configured {@link FlinkClient}.
-	 */
-	@SuppressWarnings("rawtypes")
-	public static FlinkClient getConfiguredClient(final Map conf) {
-		final String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
-		final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
-		return new FlinkClient(conf, nimbusHost, nimbusPort);
-	}
-
-	/**
-	 * Return a reference to itself.
-	 *
-	 * <p>{@link FlinkClient} mimics both, {@link NimbusClient} and {@link Nimbus}{@code .Client}, at once.
-	 *
-	 * @return A reference to itself.
-	 */
-	public FlinkClient getClient() {
-		return this;
-	}
-
-	// The following methods are derived from "backtype.storm.generated.Nimbus.Client"
-
-	/**
-	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
-	 * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
-	 */
-	public void submitTopology(final String name, final String uploadedJarLocation, final FlinkTopology topology)
-			throws AlreadyAliveException, InvalidTopologyException {
-		this.submitTopologyWithOpts(name, uploadedJarLocation, topology);
-	}
-
-	/**
-	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
-	 * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
-	 */
-	public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology topology)
-			throws AlreadyAliveException, InvalidTopologyException {
-
-		if (this.getTopologyJobId(name) != null) {
-			throw new AlreadyAliveException();
-		}
-
-		final URI uploadedJarUri;
-		final URL uploadedJarUrl;
-		try {
-			uploadedJarUri = new File(uploadedJarLocation).getAbsoluteFile().toURI();
-			uploadedJarUrl = uploadedJarUri.toURL();
-			JobWithJars.checkJarFile(uploadedJarUrl);
-		} catch (final IOException e) {
-			throw new RuntimeException("Problem with jar file " + uploadedJarLocation, e);
-		}
-
-		try {
-			FlinkClient.addStormConfigToTopology(topology, conf);
-		} catch (ClassNotFoundException e) {
-			LOG.error("Could not register class for Kryo serialization.", e);
-			throw new InvalidTopologyException("Could not register class for Kryo serialization.");
-		}
-
-		final StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();
-		streamGraph.setJobName(name);
-
-		final JobGraph jobGraph = streamGraph.getJobGraph();
-		jobGraph.addJar(new Path(uploadedJarUri));
-
-		final Configuration configuration = jobGraph.getJobConfiguration();
-		configuration.setString(JobManagerOptions.ADDRESS, jobManagerHost);
-		configuration.setInteger(JobManagerOptions.PORT, jobManagerPort);
-
-		final StandaloneClusterClient client;
-		try {
-			client = new StandaloneClusterClient(configuration);
-		} catch (final Exception e) {
-			throw new RuntimeException("Could not establish a connection to the job manager", e);
-		}
-
-		try {
-			ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader(
-					Collections.<URL>singletonList(uploadedJarUrl),
-					Collections.<URL>emptyList(),
-					this.getClass().getClassLoader());
-			client.runDetached(jobGraph, classLoader);
-		} catch (final ProgramInvocationException e) {
-			throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
-		}
-	}
-
-	public void killTopology(final String name) throws NotAliveException {
-		this.killTopologyWithOpts(name, null);
-	}
-
-	public void killTopologyWithOpts(final String name, final KillOptions options) throws NotAliveException {
-		final JobID jobId = this.getTopologyJobId(name);
-		if (jobId == null) {
-			throw new NotAliveException("Storm topology with name " + name + " not found.");
-		}
-
-		if (options != null) {
-			try {
-				Thread.sleep(1000 * options.get_wait_secs());
-			} catch (final InterruptedException e) {
-				throw new RuntimeException(e);
-			}
-		}
-
-		final Configuration configuration = GlobalConfiguration.loadConfiguration();
-		configuration.setString(JobManagerOptions.ADDRESS, this.jobManagerHost);
-		configuration.setInteger(JobManagerOptions.PORT, this.jobManagerPort);
-
-		final StandaloneClusterClient client;
-		try {
-			client = new StandaloneClusterClient(configuration);
-		} catch (final Exception e) {
-			throw new RuntimeException("Could not establish a connection to the job manager", e);
-		}
-
-		try {
-			client.stop(jobId);
-		} catch (final Exception e) {
-			throw new RuntimeException("Cannot stop job.", e);
-		}
-
-	}
-
-	// Flink specific additional methods
-
-	/**
-	 * Package internal method to get a Flink {@link JobID} from a Storm topology name.
-	 *
-	 * @param id
-	 * 		The Storm topology name.
-	 * @return Flink's internally used {@link JobID}.
-	 */
-	JobID getTopologyJobId(final String id) {
-		final Configuration configuration = GlobalConfiguration.loadConfiguration();
-		if (this.timeout != null) {
-			configuration.setString(AkkaOptions.ASK_TIMEOUT, this.timeout);
-		}
-
-		try {
-			final ActorRef jobManager = this.getJobManager();
-
-			final FiniteDuration askTimeout = this.getTimeout();
-			final Future<Object> response = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus(),
-					new Timeout(askTimeout));
-
-			final Object result;
-			try {
-				result = Await.result(response, askTimeout);
-			} catch (final Exception e) {
-				throw new RuntimeException("Could not retrieve running jobs from the JobManager", e);
-			}
-
-			if (result instanceof RunningJobsStatus) {
-				final List<JobStatusMessage> jobs = ((RunningJobsStatus) result).getStatusMessages();
-
-				for (final JobStatusMessage status : jobs) {
-					if (status.getJobName().equals(id)) {
-						return status.getJobId();
-					}
-				}
-			} else {
-				throw new RuntimeException("ReqeustRunningJobs requires a response of type "
-						+ "RunningJobs. Instead the response is of type " + result.getClass() + ".");
-			}
-		} catch (final IOException e) {
-			throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
-					+ ":" + this.jobManagerPort, e);
-		}
-
-		return null;
-	}
-
-	private FiniteDuration getTimeout() {
-		final Configuration configuration = GlobalConfiguration.loadConfiguration();
-		if (this.timeout != null) {
-			configuration.setString(AkkaOptions.ASK_TIMEOUT, this.timeout);
-		}
-
-		return AkkaUtils.getClientTimeout(configuration);
-	}
-
-	private ActorRef getJobManager() throws IOException {
-		final Configuration configuration = GlobalConfiguration.loadConfiguration();
-
-		ActorSystem actorSystem;
-		try {
-			final scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
-			actorSystem = AkkaUtils.createActorSystem(configuration, new Some<scala.Tuple2<String, Object>>(
-					systemEndpoint));
-		} catch (final Exception e) {
-			throw new RuntimeException("Could not start actor system to communicate with JobManager", e);
-		}
-
-		final String jobManagerAkkaUrl = AkkaRpcServiceUtils.getRpcUrl(
-			jobManagerHost,
-			jobManagerPort,
-			JobMaster.JOB_MANAGER_NAME,
-			AddressResolution.TRY_ADDRESS_RESOLUTION,
-			configuration);
-
-		return AkkaUtils.getActorRef(jobManagerAkkaUrl, actorSystem, AkkaUtils.getLookupTimeout(configuration));
-	}
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	static void addStormConfigToTopology(FlinkTopology topology, Map conf) throws ClassNotFoundException {
-		if (conf != null) {
-			ExecutionConfig flinkConfig = topology.getExecutionEnvironment().getConfig();
-
-			flinkConfig.setGlobalJobParameters(new StormConfig(conf));
-
-			// add all registered types to ExecutionConfig
-			List<?> registeredClasses = (List<?>) conf.get(Config.TOPOLOGY_KRYO_REGISTER);
-			if (registeredClasses != null) {
-				for (Object klass : registeredClasses) {
-					if (klass instanceof String) {
-						flinkConfig.registerKryoType(Class.forName((String) klass));
-					} else {
-						for (Entry<String, String> register : ((Map<String, String>) klass).entrySet()) {
-							flinkConfig.registerTypeWithKryoSerializer(Class.forName(register.getKey()),
-									(Class<? extends Serializer<?>>) Class.forName(register.getValue()));
-						}
-					}
-				}
-			}
-		}
-	}
-}
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
deleted file mode 100644
index 655978ffa4d..00000000000
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.api;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.ClusterSummary;
-import org.apache.storm.generated.KillOptions;
-import org.apache.storm.generated.RebalanceOptions;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.generated.SubmitOptions;
-import org.apache.storm.generated.TopologyInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
- */
-public class FlinkLocalCluster {
-
-	/** The log used by this mini cluster. */
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);
-
-	/** The Flink mini cluster on which to execute the programs. */
-	private FlinkMiniCluster flink;
-
-	/** Configuration key to submit topology in blocking mode if flag is set to {@code true}. */
-	public static final String SUBMIT_BLOCKING = "SUBMIT_STORM_TOPOLOGY_BLOCKING";
-
-	public FlinkLocalCluster() {
-	}
-
-	public FlinkLocalCluster(FlinkMiniCluster flink) {
-		this.flink = Objects.requireNonNull(flink);
-	}
-
-	@SuppressWarnings("rawtypes")
-	public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology)
-			throws Exception {
-		this.submitTopologyWithOpts(topologyName, conf, topology, null);
-	}
-
-	@SuppressWarnings("rawtypes")
-	public void submitTopologyWithOpts(final String topologyName, final Map conf, final FlinkTopology topology, final SubmitOptions submitOpts) throws Exception {
-		LOG.info("Running Storm topology on FlinkLocalCluster");
-
-		boolean submitBlocking = false;
-		if (conf != null) {
-			Object blockingFlag = conf.get(SUBMIT_BLOCKING);
-			if (blockingFlag instanceof Boolean) {
-				submitBlocking = ((Boolean) blockingFlag).booleanValue();
-			}
-		}
-
-		FlinkClient.addStormConfigToTopology(topology, conf);
-
-		StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();
-		streamGraph.setJobName(topologyName);
-
-		JobGraph jobGraph = streamGraph.getJobGraph();
-
-		if (this.flink == null) {
-			Configuration configuration = new Configuration();
-			configuration.addAll(jobGraph.getJobConfiguration());
-
-			configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
-			configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
-
-			this.flink = new LocalFlinkMiniCluster(configuration, true);
-			this.flink.start();
-		}
-
-		if (submitBlocking) {
-			this.flink.submitJobAndWait(jobGraph, false);
-		} else {
-			this.flink.submitJobDetached(jobGraph);
-		}
-	}
-
-	public void killTopology(final String topologyName) {
-		this.killTopologyWithOpts(topologyName, null);
-	}
-
-	public void killTopologyWithOpts(final String name, final KillOptions options) {
-	}
-
-	public void activate(final String topologyName) {
-	}
-
-	public void deactivate(final String topologyName) {
-	}
-
-	public void rebalance(final String name, final RebalanceOptions options) {
-	}
-
-	public void shutdown() {
-		if (this.flink != null) {
-			this.flink.stop();
-			this.flink = null;
-		}
-	}
-
-	public String getTopologyConf(final String id) {
-		return null;
-	}
-
-	public StormTopology getTopology(final String id) {
-		return null;
-	}
-
-	public ClusterSummary getClusterInfo() {
-		return null;
-	}
-
-	public TopologyInfo getTopologyInfo(final String id) {
-		return null;
-	}
-
-	public Map<?, ?> getState() {
-		return null;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Access to default local cluster
-	// ------------------------------------------------------------------------
-
-	// A different {@link FlinkLocalCluster} to be used for execution of ITCases
-	private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();
-
-	/**
-	 * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
-	 * {@link #initialize(LocalClusterFactory)} in advance, a new {@link FlinkLocalCluster} is returned.
-	 *
-	 * @return a {@link FlinkLocalCluster} to be used for execution
-	 */
-	public static FlinkLocalCluster getLocalCluster() {
-		return currentFactory.createLocalCluster();
-	}
-
-	/**
-	 * Sets a different factory for FlinkLocalClusters to be used for execution.
-	 *
-	 * @param clusterFactory
-	 * 		The LocalClusterFactory to create the local clusters for execution.
-	 */
-	public static void initialize(LocalClusterFactory clusterFactory) {
-		currentFactory = Objects.requireNonNull(clusterFactory);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Cluster factory
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A factory that creates local clusters.
-	 */
-	public interface LocalClusterFactory {
-
-		/**
-		 * Creates a local Flink cluster.
-		 * @return A local Flink cluster.
-		 */
-		FlinkLocalCluster createLocalCluster();
-	}
-
-	/**
-	 * A factory that instantiates a FlinkLocalCluster.
-	 */
-	public static class DefaultLocalClusterFactory implements LocalClusterFactory {
-
-		@Override
-		public FlinkLocalCluster createLocalCluster() {
-			return new FlinkLocalCluster();
-		}
-	}
-}
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
deleted file mode 100644
index b1e8a47d8fd..00000000000
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.api;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a
- * {@link org.apache.storm.topology.IRichSpout spout} or {@link org.apache.storm.topology.IRichBolt bolt}.<br />
- * <br />
- * <strong>CAUTION: Flink does not support direct emit.</strong>
- */
-final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
-
-	/** The declared output streams and schemas. */
-	final HashMap<String, Fields> outputStreams = new HashMap<String, Fields>();
-
-	@Override
-	public void declare(final Fields fields) {
-		this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
-	}
-
-	/**
-	 * {@inheritDoc}
-	 * <p/>
-	 * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		if {@code direct} is {@code true}
-	 */
-	@Override
-	public void declare(final boolean direct, final Fields fields) {
-		this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
-	}
-
-	@Override
-	public void declareStream(final String streamId, final Fields fields) {
-		this.declareStream(streamId, false, fields);
-	}
-
-	/**
-	 * {@inheritDoc}
-	 * <p/>
-	 * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		if {@code direct} is {@code true}
-	 */
-	@Override
-	public void declareStream(final String streamId, final boolean direct, final Fields fields) {
-		if (direct) {
-			throw new UnsupportedOperationException("Direct emit is not supported by Flink");
-		}
-
-		this.outputStreams.put(streamId, fields);
-	}
-
-	/**
-	 * Returns {@link TypeInformation} for the declared output schema for a specific stream.
-	 *
-	 * @param streamId
-	 *            A stream ID.
-	 *
-	 * @return output type information for the declared output schema of the specified stream; or {@code null} if
-	 *         {@code streamId == null}
-	 *
-	 * @throws IllegalArgumentException
-	 *             If no output schema was declared for the specified stream or if more then 25 attributes got declared.
-	 */
-	TypeInformation<Tuple> getOutputType(final String streamId) throws IllegalArgumentException {
-		if (streamId == null) {
-			return null;
-		}
-
-		Fields outputSchema = this.outputStreams.get(streamId);
-		if (outputSchema == null) {
-			throw new IllegalArgumentException("Stream with ID '" + streamId
-					+ "' was not declared.");
-		}
-
-		Tuple t;
-		final int numberOfAttributes = outputSchema.size();
-
-		if (numberOfAttributes <= 24) {
-			try {
-				t = Tuple.getTupleClass(numberOfAttributes + 1).newInstance();
-			} catch (final InstantiationException e) {
-				throw new RuntimeException(e);
-			} catch (final IllegalAccessException e) {
-				throw new RuntimeException(e);
-			}
-		} else {
-			throw new IllegalArgumentException("Flink supports only a maximum number of 24 attributes");
-		}
-
-		// TODO: declare only key fields as DefaultComparable
-		for (int i = 0; i < numberOfAttributes + 1; ++i) {
-			t.setField(new DefaultComparable(), i);
-		}
-
-		return TypeExtractor.getForObject(t);
-	}
-
-	/**
-	 * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link
-	 * TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable,
-	 * Flink cannot use them and will throw an exception.
-	 */
-	private static class DefaultComparable implements Comparable<DefaultComparable> {
-
-		public DefaultComparable() {
-		}
-
-		@Override
-		public int compareTo(final DefaultComparable o) {
-			return 0;
-		}
-	}
-
-	/**
-	 * Computes the indexes within the declared output schema of the specified stream, for a list of given
-	 * field-grouping attributes.
-	 *
-	 * @param streamId
-	 *            A stream ID.
-	 * @param groupingFields
-	 *            The names of the key fields.
-	 *
-	 * @return array of {@code int}s that contains the index within the output schema for each attribute in the given
-	 *         list
-	 */
-	int[] getGroupingFieldIndexes(final String streamId, final List<String> groupingFields) {
-		final int[] fieldIndexes = new int[groupingFields.size()];
-
-		for (int i = 0; i < fieldIndexes.length; ++i) {
-			fieldIndexes[i] = this.outputStreams.get(streamId).fieldIndex(groupingFields.get(i));
-		}
-
-		return fieldIndexes;
-	}
-
-}
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
deleted file mode 100644
index 5b3f609e5ab..00000000000
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.api;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.AlreadyAliveException;
-import org.apache.storm.generated.InvalidTopologyException;
-import org.apache.storm.generated.SubmitOptions;
-import org.apache.storm.utils.Utils;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.Map;
-
-/**
- * {@link FlinkSubmitter} mimics a {@link StormSubmitter} to submit Storm topologies to a Flink cluster.
- */
-public class FlinkSubmitter {
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkSubmitter.class);
-
-	/**
-	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
-	 *
-	 * @param name
-	 * 		the name of the storm.
-	 * @param stormConf
-	 * 		the topology-specific configuration. See {@link Config}.
-	 * @param topology
-	 * 		the processing to execute.
-	 * @param opts
-	 * 		to manipulate the starting of the topology.
-	 * @throws AlreadyAliveException
-	 * 		if a topology with this name is already running
-	 * @throws InvalidTopologyException
-	 * 		if an invalid topology was submitted
-	 */
-	public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology,
-			final SubmitOptions opts)
-					throws AlreadyAliveException, InvalidTopologyException {
-		submitTopology(name, stormConf, topology);
-	}
-
-	/**
-	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. The given {@link
-	 * FlinkProgressListener} is ignored because progress bars are not supported by Flink.
-	 *
-	 * @param name
-	 * 		the name of the storm.
-	 * @param stormConf
-	 * 		the topology-specific configuration. See {@link Config}.
-	 * @param topology
-	 * 		the processing to execute.
-	 * @throws AlreadyAliveException
-	 * 		if a topology with this name is already running
-	 * @throws InvalidTopologyException
-	 * 		if an invalid topology was submitted
-	 */
-	@SuppressWarnings({"rawtypes", "unchecked"})
-	public static void submitTopology(final String name, final Map stormConf, final FlinkTopology topology)
-			throws AlreadyAliveException, InvalidTopologyException {
-		if (!Utils.isValidConf(stormConf)) {
-			throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
-		}
-
-		final Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
-		if (!stormConf.containsKey(Config.NIMBUS_HOST)) {
-			stormConf.put(Config.NIMBUS_HOST,
-					flinkConfig.getString(JobManagerOptions.ADDRESS, "localhost"));
-		}
-		if (!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
-			stormConf.put(Config.NIMBUS_THRIFT_PORT,
-					new Integer(flinkConfig.getInteger(JobManagerOptions.PORT)));
-		}
-
-		final String serConf = JSONValue.toJSONString(stormConf);
-
-		final FlinkClient client = FlinkClient.getConfiguredClient(stormConf);
-
-		try {
-			if (client.getTopologyJobId(name) != null) {
-				throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
-			}
-			String localJar = System.getProperty("storm.jar");
-			if (localJar == null) {
-				try {
-					for (final URL url : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
-							.getJars()) {
-						// TODO verify that there is only one jar
-						localJar = new File(url.toURI()).getAbsolutePath();
-					}
-				} catch (final URISyntaxException e) {
-					// ignore
-				} catch (final ClassCastException e) {
-					// ignore
-				}
-			}
-			Preconditions.checkNotNull(localJar, "LocalJar must not be null.");
-			LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
-			client.submitTopologyWithOpts(name, localJar, topology);
-		} catch (final InvalidTopologyException e) {
-			LOG.warn("Topology submission exception: " + e.get_msg());
-			throw e;
-		} catch (final AlreadyAliveException e) {
-			LOG.warn("Topology already alive exception", e);
-			throw e;
-		}
-
-		LOG.info("Finished submitting topology: " + name);
-	}
-
-	/**
-	 * Same as {@link #submitTopology(String, Map, FlinkTopology, SubmitOptions)}. Progress bars are not supported by
-	 * Flink.
-	 *
-	 * @param name
-	 * 		the name of the storm.
-	 * @param stormConf
-	 * 		the topology-specific configuration. See {@link Config}.
-	 * @param topology
-	 * 		the processing to execute.
-	 * @throws AlreadyAliveException
-	 * 		if a topology with this name is already running
-	 * @throws InvalidTopologyException
-	 * 		if an invalid topology was submitted
-	 */
-	public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf,
-			final FlinkTopology topology)
-					throws AlreadyAliveException, InvalidTopologyException {
-		submitTopology(name, stormConf, topology);
-	}
-
-	/**
-	 * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
-	 * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
-	 * environment.
-	 *
-	 * @param conf
-	 * 		the topology-specific configuration. See {@link Config}.
-	 * @param localJar
-	 * 		file path of the jar file to submit
-	 * @return the value of parameter localJar
-	 */
-	@SuppressWarnings("rawtypes")
-	public static String submitJar(final Map conf, final String localJar) {
-		return submitJar(localJar);
-	}
-
-	/**
-	 * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
-	 * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
-	 * environment.
-	 *
-	 * @param localJar
-	 * 		file path of the jar file to submit
-	 * @return the value of parameter localJar
-	 */
-	public static String submitJar(final String localJar) {
-		if (localJar == null) {
-			throw new RuntimeException(
-					"Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar " +
-					"to upload");
-		}
-
-		return localJar;
-	}
-
-	/**
-	 * Dummy interface use to track progress of file upload. Does not do anything. Kept for compatibility.
-	 */
-	public interface FlinkProgressListener {
-	}
-
-}
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
deleted file mode 100644
index 3b78a9089aa..00000000000
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
+++ /dev/null
@@ -1,480 +0,0 @@
-/*
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.api;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.storm.util.SplitStreamMapper;
-import org.apache.flink.storm.util.SplitStreamType;
-import org.apache.flink.storm.util.StormStreamSelector;
-import org.apache.flink.storm.wrappers.BoltWrapper;
-import org.apache.flink.storm.wrappers.MergedInputsBoltWrapper;
-import org.apache.flink.storm.wrappers.SpoutWrapper;
-import org.apache.flink.storm.wrappers.StormTuple;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.InstantiationUtil;
-
-import org.apache.storm.generated.ComponentCommon;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.generated.Grouping;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.IRichStateSpout;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-/**
- * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink program.
- * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported.</strong>
- */
-public class FlinkTopology {
-
-	/** All declared streams and output schemas by operator ID. */
-	private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<String, HashMap<String, Fields>>();
-	/** All spouts&bolts declarers by their ID. */
-	private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>();
-
-	private final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt =
-			new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>();
-
-	final HashMap<String, HashMap<String, DataStream<Tuple>>> availableInputs = new HashMap<>();
-
-	private final TopologyBuilder builder;
-
-	// needs to be a class member for internal testing purpose
-	private final StormTopology stormTopology;
-
-	private final Map<String, IRichSpout> spouts;
-	private final Map<String, IRichBolt> bolts;
-
-	private final StreamExecutionEnvironment env;
-
-	private FlinkTopology(TopologyBuilder builder) {
-		this.builder = builder;
-		this.stormTopology = builder.createTopology();
-		// extract the spouts and bolts
-		this.spouts = getPrivateField("_spouts");
-		this.bolts = getPrivateField("_bolts");
-
-		this.env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// Kick off the translation immediately
-		translateTopology();
-	}
-
-	/**
-	 * Creates a Flink program that uses the specified spouts and bolts.
-	 * @param stormBuilder The Storm topology builder to use for creating the Flink topology.
-	 * @return A {@link FlinkTopology} which contains the translated Storm topology and may be executed.
-	 */
-	public static FlinkTopology createTopology(TopologyBuilder stormBuilder) {
-		return new FlinkTopology(stormBuilder);
-	}
-
-	/**
-	 * Returns the underlying Flink {@link StreamExecutionEnvironment} for the Storm topology.
-	 * @return The contextual environment (local or remote).
-	 */
-	public StreamExecutionEnvironment getExecutionEnvironment() {
-		return this.env;
-	}
-
-	/**
-	 * Directly executes the Storm topology based on the current context (local when in IDE and
-	 * remote when executed through ./bin/flink).
-	 * @return The Flink {@link JobExecutionResult} after the execution of the Storm topology.
-	 * @throws Exception which occurs during execution of the translated Storm topology.
-	 */
-	public JobExecutionResult execute() throws Exception {
-		return env.execute();
-	}
-
-	@SuppressWarnings("unchecked")
-	private <T> Map<String, T> getPrivateField(String field) {
-		try {
-			Field f = builder.getClass().getDeclaredField(field);
-			f.setAccessible(true);
-			return copyObject((Map<String, T>) f.get(builder));
-		} catch (NoSuchFieldException | IllegalAccessException e) {
-			throw new RuntimeException("Couldn't get " + field + " from TopologyBuilder", e);
-		}
-	}
-
-	private <T> T copyObject(T object) {
-		try {
-			return InstantiationUtil.deserializeObject(
-					InstantiationUtil.serializeObject(object),
-					getClass().getClassLoader()
-					);
-		} catch (IOException | ClassNotFoundException e) {
-			throw new RuntimeException("Failed to copy object.", e);
-		}
-	}
-
-	/**
-	 * Creates a Flink program that uses the specified spouts and bolts.
-	 */
-	private void translateTopology() {
-
-		unprocessdInputsPerBolt.clear();
-		outputStreams.clear();
-		declarers.clear();
-		availableInputs.clear();
-
-		// Storm defaults to parallelism 1
-		env.setParallelism(1);
-
-		/* Translation of topology */
-
-		for (final Entry<String, IRichSpout> spout : spouts.entrySet()) {
-			final String spoutId = spout.getKey();
-			final IRichSpout userSpout = spout.getValue();
-
-			final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
-			userSpout.declareOutputFields(declarer);
-			final HashMap<String, Fields> sourceStreams = declarer.outputStreams;
-			this.outputStreams.put(spoutId, sourceStreams);
-			declarers.put(spoutId, declarer);
-
-			final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>();
-			final DataStreamSource<?> source;
-
-			if (sourceStreams.size() == 1) {
-				final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, null);
-				spoutWrapperSingleOutput.setStormTopology(stormTopology);
-
-				final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];
-
-				DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId,
-						declarer.getOutputType(outputStreamId));
-
-				outputStreams.put(outputStreamId, src);
-				source = src;
-			} else {
-				final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
-						userSpout, spoutId, null, null);
-				spoutWrapperMultipleOutputs.setStormTopology(stormTopology);
-
-				@SuppressWarnings({ "unchecked", "rawtypes" })
-				DataStreamSource<SplitStreamType<Tuple>> multiSource = env.addSource(
-						spoutWrapperMultipleOutputs, spoutId,
-						(TypeInformation) TypeExtractor.getForClass(SplitStreamType.class));
-
-				SplitStream<SplitStreamType<Tuple>> splitSource = multiSource
-						.split(new StormStreamSelector<Tuple>());
-				for (String streamId : sourceStreams.keySet()) {
-					SingleOutputStreamOperator<Tuple> outStream = splitSource.select(streamId)
-							.map(new SplitStreamMapper<Tuple>());
-					outStream.getTransformation().setOutputType(declarer.getOutputType(streamId));
-					outputStreams.put(streamId, outStream);
-				}
-				source = multiSource;
-			}
-			availableInputs.put(spoutId, outputStreams);
-
-			final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common();
-			if (common.is_set_parallelism_hint()) {
-				int dop = common.get_parallelism_hint();
-				source.setParallelism(dop);
-			} else {
-				common.set_parallelism_hint(1);
-			}
-		}
-
-		/**
-		 * 1. Connect all spout streams with bolts streams
-		 * 2. Then proceed with the bolts stream already connected
-		 *
-		 * <p>Because we do not know the order in which an iterator steps over a set, we might process a consumer before
-		 * its producer
-		 * ->thus, we might need to repeat multiple times
-		 */
-		boolean makeProgress = true;
-		while (bolts.size() > 0) {
-			if (!makeProgress) {
-				StringBuilder strBld = new StringBuilder();
-				strBld.append("Unable to build Topology. Could not connect the following bolts:");
-				for (String boltId : bolts.keySet()) {
-					strBld.append("\n  ");
-					strBld.append(boltId);
-					strBld.append(": missing input streams [");
-					for (Entry<GlobalStreamId, Grouping> streams : unprocessdInputsPerBolt
-							.get(boltId)) {
-						strBld.append("'");
-						strBld.append(streams.getKey().get_streamId());
-						strBld.append("' from '");
-						strBld.append(streams.getKey().get_componentId());
-						strBld.append("'; ");
-					}
-					strBld.append("]");
-				}
-
-				throw new RuntimeException(strBld.toString());
-			}
-			makeProgress = false;
-
-			final Iterator<Entry<String, IRichBolt>> boltsIterator = bolts.entrySet().iterator();
-			while (boltsIterator.hasNext()) {
-
-				final Entry<String, IRichBolt> bolt = boltsIterator.next();
-				final String boltId = bolt.getKey();
-				final IRichBolt userBolt = copyObject(bolt.getValue());
-
-				final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common();
-
-				Set<Entry<GlobalStreamId, Grouping>> unprocessedBoltInputs = unprocessdInputsPerBolt.get(boltId);
-				if (unprocessedBoltInputs == null) {
-					unprocessedBoltInputs = new HashSet<>();
-					unprocessedBoltInputs.addAll(common.get_inputs().entrySet());
-					unprocessdInputsPerBolt.put(boltId, unprocessedBoltInputs);
-				}
-
-				// check if all inputs are available
-				final int numberOfInputs = unprocessedBoltInputs.size();
-				int inputsAvailable = 0;
-				for (Entry<GlobalStreamId, Grouping> entry : unprocessedBoltInputs) {
-					final String producerId = entry.getKey().get_componentId();
-					final String streamId = entry.getKey().get_streamId();
-					final HashMap<String, DataStream<Tuple>> streams = availableInputs.get(producerId);
-					if (streams != null && streams.get(streamId) != null) {
-						inputsAvailable++;
-					}
-				}
-
-				if (inputsAvailable != numberOfInputs) {
-					// traverse other bolts first until inputs are available
-					continue;
-				} else {
-					makeProgress = true;
-					boltsIterator.remove();
-				}
-
-				final Map<GlobalStreamId, DataStream<Tuple>> inputStreams = new HashMap<>(numberOfInputs);
-
-				for (Entry<GlobalStreamId, Grouping> input : unprocessedBoltInputs) {
-					final GlobalStreamId streamId = input.getKey();
-					final Grouping grouping = input.getValue();
-
-					final String producerId = streamId.get_componentId();
-
-					final Map<String, DataStream<Tuple>> producer = availableInputs.get(producerId);
-
-					inputStreams.put(streamId, processInput(boltId, userBolt, streamId, grouping, producer));
-				}
-
-				final SingleOutputStreamOperator<?> outputStream = createOutput(boltId,
-						userBolt, inputStreams);
-
-				if (common.is_set_parallelism_hint()) {
-					int dop = common.get_parallelism_hint();
-					outputStream.setParallelism(dop);
-				} else {
-					common.set_parallelism_hint(1);
-				}
-
-			}
-		}
-	}
-
-	private DataStream<Tuple> processInput(String boltId, IRichBolt userBolt,
-			GlobalStreamId streamId, Grouping grouping,
-			Map<String, DataStream<Tuple>> producer) {
-
-		assert (userBolt != null);
-		assert (boltId != null);
-		assert (streamId != null);
-		assert (grouping != null);
-		assert (producer != null);
-
-		final String producerId = streamId.get_componentId();
-		final String inputStreamId = streamId.get_streamId();
-
-		DataStream<Tuple> inputStream = producer.get(inputStreamId);
-
-		final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
-		declarers.put(boltId, declarer);
-		userBolt.declareOutputFields(declarer);
-		this.outputStreams.put(boltId, declarer.outputStreams);
-
-		// if producer was processed already
-		if (grouping.is_set_shuffle()) {
-			// Storm uses a round-robin shuffle strategy
-			inputStream = inputStream.rebalance();
-		} else if (grouping.is_set_fields()) {
-			// global grouping is emulated in Storm via an empty fields grouping list
-			final List<String> fields = grouping.get_fields();
-			if (fields.size() > 0) {
-				FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
-				inputStream = inputStream.keyBy(prodDeclarer
-						.getGroupingFieldIndexes(inputStreamId,
-								grouping.get_fields()));
-			} else {
-				inputStream = inputStream.global();
-			}
-		} else if (grouping.is_set_all()) {
-			inputStream = inputStream.broadcast();
-		} else if (!grouping.is_set_local_or_shuffle()) {
-			throw new UnsupportedOperationException(
-					"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
-		}
-
-		return inputStream;
-	}
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	private SingleOutputStreamOperator<?> createOutput(String boltId, IRichBolt bolt,
-			Map<GlobalStreamId, DataStream<Tuple>> inputStreams) {
-		assert (boltId != null);
-		assert (bolt != null);
-		assert (inputStreams != null);
-
-		Iterator<Entry<GlobalStreamId, DataStream<Tuple>>> iterator = inputStreams.entrySet()
-				.iterator();
-
-		Entry<GlobalStreamId, DataStream<Tuple>> input1 = iterator.next();
-		GlobalStreamId streamId1 = input1.getKey();
-		String inputStreamId1 = streamId1.get_streamId();
-		String inputComponentId1 = streamId1.get_componentId();
-		Fields inputSchema1 = this.outputStreams.get(inputComponentId1).get(inputStreamId1);
-		DataStream<Tuple> singleInputStream = input1.getValue();
-
-		DataStream<StormTuple<Tuple>> mergedInputStream = null;
-		while (iterator.hasNext()) {
-			Entry<GlobalStreamId, DataStream<Tuple>> input2 = iterator.next();
-			GlobalStreamId streamId2 = input2.getKey();
-			DataStream<Tuple> inputStream2 = input2.getValue();
-
-			if (mergedInputStream == null) {
-				mergedInputStream = singleInputStream
-						.connect(inputStream2)
-						.flatMap(
-								new TwoFlinkStreamsMerger(streamId1, inputSchema1,
-										streamId2, this.outputStreams.get(
-												streamId2.get_componentId()).get(
-														streamId2.get_streamId())))
-														.returns(StormTuple.class);
-			} else {
-				mergedInputStream = mergedInputStream
-						.connect(inputStream2)
-						.flatMap(
-								new StormFlinkStreamMerger(streamId2, this.outputStreams.get(
-										streamId2.get_componentId()).get(streamId2.get_streamId())))
-										.returns(StormTuple.class);
-			}
-		}
-
-		final HashMap<String, Fields> boltOutputs = this.outputStreams.get(boltId);
-		final FlinkOutputFieldsDeclarer declarer = this.declarers.get(boltId);
-
-		final SingleOutputStreamOperator<?> outputStream;
-
-		if (boltOutputs.size() < 2) { // single output stream or sink
-			String outputStreamId;
-			if (boltOutputs.size() == 1) {
-				outputStreamId = (String) boltOutputs.keySet().toArray()[0];
-			} else {
-				outputStreamId = null;
-			}
-
-			final TypeInformation<Tuple> outType = declarer.getOutputType(outputStreamId);
-
-			final SingleOutputStreamOperator<Tuple> outStream;
-
-			// only one input
-			if (inputStreams.entrySet().size() == 1) {
-				BoltWrapper<Tuple, Tuple> boltWrapper = new BoltWrapper<>(bolt, boltId,
-						inputStreamId1, inputComponentId1, inputSchema1, null);
-				boltWrapper.setStormTopology(stormTopology);
-				outStream = singleInputStream.transform(boltId, outType, boltWrapper);
-			} else {
-				MergedInputsBoltWrapper<Tuple, Tuple> boltWrapper = new MergedInputsBoltWrapper<Tuple, Tuple>(
-						bolt, boltId, null);
-				boltWrapper.setStormTopology(stormTopology);
-				outStream = mergedInputStream.transform(boltId, outType, boltWrapper);
-			}
-
-			if (outType != null) {
-				// only for non-sink nodes
-				final HashMap<String, DataStream<Tuple>> op = new HashMap<>();
-				op.put(outputStreamId, outStream);
-				availableInputs.put(boltId, op);
-			}
-			outputStream = outStream;
-		} else {
-			final TypeInformation<SplitStreamType<Tuple>> outType = (TypeInformation) TypeExtractor
-					.getForClass(SplitStreamType.class);
-
-			final SingleOutputStreamOperator<SplitStreamType<Tuple>> multiStream;
-
-			// only one input
-			if (inputStreams.entrySet().size() == 1) {
-				final BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new BoltWrapper<>(
-						bolt, boltId, inputStreamId1, inputComponentId1, inputSchema1, null);
-				boltWrapperMultipleOutputs.setStormTopology(stormTopology);
-				multiStream = singleInputStream.transform(boltId, outType, boltWrapperMultipleOutputs);
-			} else {
-				final MergedInputsBoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new MergedInputsBoltWrapper<Tuple, SplitStreamType<Tuple>>(
-						bolt, boltId, null);
-				boltWrapperMultipleOutputs.setStormTopology(stormTopology);
-				multiStream = mergedInputStream.transform(boltId, outType, boltWrapperMultipleOutputs);
-			}
-
-			final SplitStream<SplitStreamType<Tuple>> splitStream = multiStream
-					.split(new StormStreamSelector<Tuple>());
-
-			final HashMap<String, DataStream<Tuple>> op = new HashMap<>();
-			for (String outputStreamId : boltOutputs.keySet()) {
-				op.put(outputStreamId,
-						splitStream.select(outputStreamId).map(
-								new SplitStreamMapper<Tuple>()));
-				SingleOutputStreamOperator<Tuple> outStream = splitStream
-						.select(outputStreamId).map(new SplitStreamMapper<Tuple>());
-				outStream.getTransformation().setOutputType(declarer.getOutputType(outputStreamId));
-				op.put(outputStreamId, outStream);
-			}
-			availableInputs.put(boltId, op);
-			outputStream = multiStream;
-		}
-
-		return outputStream;
-	}
-
-	// for internal testing purpose only
-	public StormTopology getStormTopology() {
-		return this.stormTopology;
-	}
-}
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/StormFlinkStreamMerger.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/StormFlinkStreamMerger.java
deleted file mode 100644
index 00c467e3da7..00000000000
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/StormFlinkStreamMerger.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * /* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.flink.storm.api;
-
-import org.apache.flink.storm.wrappers.StormTuple;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.util.Collector;
-
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.MessageId;
-
-/**
- * Merges a stream of type {@link StormTuple} with a Flink {@link DataStreams} into a stream of type {@link StormTuple}.
- */
-@SuppressWarnings("rawtypes")
-final class StormFlinkStreamMerger<IN1, IN2> implements CoFlatMapFunction<StormTuple<IN1>, IN2, StormTuple> {
-	private static final long serialVersionUID = -914164633830563631L;
-
-	private final String inputStreamId;
-	private final String inputComponentId;
-	private final Fields inputSchema;
-
-	public StormFlinkStreamMerger(GlobalStreamId streamId, Fields schema) {
-		this.inputStreamId = streamId.get_streamId();
-		this.inputComponentId = streamId.get_componentId();
-		this.inputSchema = schema;
-	}
-
-	@Override
-	public void flatMap1(StormTuple<IN1> value, Collector<StormTuple> out) throws Exception {
-		out.collect(value);
-	}
-
-	@Override
-	public void flatMap2(IN2 value, Collector<StormTuple> out) throws Exception {
-		out.collect(new StormTuple<IN2>(value, this.inputSchema, 0, this.inputStreamId,
-				this.inputComponentId, MessageId.makeUnanchored()));
-	}
-}
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/TwoFlinkStreamsMerger.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/TwoFlinkStreamsMerger.java
deleted file mode 100644
index f7bcb12c811..00000000000
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/TwoFlinkStreamsMerger.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * /* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.flink.storm.api;
-
-import org.apache.flink.storm.wrappers.StormTuple;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.util.Collector;
-
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.MessageId;
-
-/**
- * Merges two Flink {@link DataStreams} into a stream of type {@link StormTuple}.
- */
-@SuppressWarnings("rawtypes")
-final class TwoFlinkStreamsMerger<IN1, IN2> implements CoFlatMapFunction<IN1, IN2, StormTuple> {
-	private static final long serialVersionUID = -495174165824062256L;
-
-	private final String inputStreamId1;
-	private final String inputComponentId1;
-	private final Fields inputSchema1;
-	private final String inputStreamId2;
-	private final String inputComponentId2;
-	private final Fields inputSchema2;
-
-	public TwoFlinkStreamsMerger(GlobalStreamId streamId1, Fields schema1, GlobalStreamId streamId2,
-			Fields schema2) {
-		this.inputStreamId1 = streamId1.get_streamId();
-		this.inputComponentId1 = streamId1.get_componentId();
-		this.inputSchema1 = schema1;
-		this.inputStreamId2 = streamId2.get_streamId();
-		this.inputComponentId2 = streamId2.get_componentId();
-		this.inputSchema2 = schema2;
-	}
-
-	@Override
-	public void flatMap1(IN1 value, Collector<StormTuple> out) throws Exception {
-		out.collect(new StormTuple<IN1>(value, this.inputSchema1, 0,
-				this.inputStreamId1, this.inputComponentId1, MessageId.makeUnanchored()));
-	}
-
-	@Override
-	public void flatMap2(IN2 value, Collector<StormTuple> out) throws Exception {
-		out.collect(new StormTuple<IN2>(value, this.inputSchema2, 0,
-				this.inputStreamId2, this.inputComponentId2, MessageId.makeUnanchored()));
-	}
-}
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java
index 33ba3748a7a..e8007266e53 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.storm.util;
 
-import org.apache.flink.storm.api.FlinkTopology;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 
 import java.util.ArrayList;
@@ -26,7 +25,7 @@
 import java.util.List;
 
 /**
- * Used by {@link FlinkTopology} to split multiple declared output streams within Flink.
+ * Used to split multiple declared output streams within Flink.
  */
 public final class StormStreamSelector<T> implements OutputSelector<SplitStreamType<T>> {
 	private static final long serialVersionUID = 2553423379715401023L;
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index ba2435e94d3..1c12290eb26 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -18,7 +18,6 @@
 package org.apache.flink.storm.wrappers;
 
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
@@ -28,9 +27,6 @@
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.generated.Grouping;
-import org.apache.storm.generated.StormTopology;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.IRichBolt;
@@ -40,7 +36,6 @@
 
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Map;
 
 import static java.util.Arrays.asList;
 
@@ -68,15 +63,9 @@
 	/** Number of attributes of the bolt's output tuples per stream. */
 	private final HashMap<String, Integer> numberOfAttributes;
 
-	/** The original Storm topology. */
-	private StormTopology stormTopology;
 	/** The topology context of the bolt. */
 	private transient TopologyContext topologyContext;
 
-	/** The IDs of the input streams for this bolt per producer task ID. */
-	private final HashMap<Integer, String> inputStreamIds = new HashMap<Integer, String>();
-	/** The IDs of the producers for this bolt per producer task ID.. */
-	private final HashMap<Integer, String> inputComponentIds = new HashMap<Integer, String>();
 	/** The schema (ie, ordered field names) of the input streams per producer taskID. */
 	private final HashMap<Integer, Fields> inputSchemas = new HashMap<Integer, Fields>();
 
@@ -242,16 +231,6 @@ public BoltWrapper(final IRichBolt bolt, final String name, final String inputSt
 		this.numberOfAttributes = WrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs);
 	}
 
-	/**
-	 * Sets the original Storm topology.
-	 *
-	 * @param stormTopology
-	 *            The original Storm topology.
-	 */
-	public void setStormTopology(StormTopology stormTopology) {
-		this.stormTopology = stormTopology;
-	}
-
 	@Override
 	public void open() throws Exception {
 		super.open();
@@ -270,25 +249,11 @@ public void open() throws Exception {
 		}
 
 		this.topologyContext = WrapperSetupHelper.createTopologyContext(
-				getRuntimeContext(), this.bolt, this.name, this.stormTopology, stormConfig);
+				getRuntimeContext(), this.bolt, this.name, stormConfig);
 
 		final OutputCollector stormCollector = new OutputCollector(new BoltCollector<OUT>(
 				this.numberOfAttributes, this.topologyContext.getThisTaskId(), this.flinkCollector));
 
-		if (this.stormTopology != null) {
-			Map<GlobalStreamId, Grouping> inputs = this.topologyContext.getThisSources();
-
-			for (GlobalStreamId inputStream : inputs.keySet()) {
-				for (Integer tid : this.topologyContext.getComponentTasks(inputStream
-						.get_componentId())) {
-					this.inputComponentIds.put(tid, inputStream.get_componentId());
-					this.inputStreamIds.put(tid, inputStream.get_streamId());
-					this.inputSchemas.put(tid,
-							this.topologyContext.getComponentOutputFields(inputStream));
-				}
-			}
-		}
-
 		this.bolt.prepare(stormConfig, this.topologyContext, stormCollector);
 	}
 
@@ -304,17 +269,7 @@ public void processElement(final StreamRecord<IN> element) throws Exception {
 
 		IN value = element.getValue();
 
-		if (this.stormTopology != null) {
-			Tuple tuple = (Tuple) value;
-			Integer producerTaskId = tuple.getField(tuple.getArity() - 1);
-
-			this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(producerTaskId),
-					producerTaskId, this.inputStreamIds.get(producerTaskId), this.inputComponentIds
-					.get(producerTaskId), MessageId.makeUnanchored()));
-
-		} else {
-			this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(null), -1, null, null,
-					MessageId.makeUnanchored()));
-		}
+		this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(null), -1, null, null,
+			MessageId.makeUnanchored()));
 	}
 }
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
index 882ba273fef..eb103840f87 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
@@ -27,7 +27,6 @@
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
-import org.apache.storm.generated.StormTopology;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.IRichSpout;
@@ -66,8 +65,6 @@
 	private volatile boolean isRunning = true;
 	/** The number of {@link IRichSpout#nextTuple()} calls. */
 	private Integer numberOfInvocations; // do not use int -> null indicates an infinite loop
-	/** The original Storm topology. */
-	private StormTopology stormTopology;
 
 	/**
 	 * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of
@@ -229,16 +226,6 @@ public SpoutWrapper(final IRichSpout spout, final String name, final Collection<
 		this.numberOfInvocations = numberOfInvocations;
 	}
 
-	/**
-	 * Sets the original Storm topology.
-	 *
-	 * @param stormTopology
-	 *            The original Storm topology.
-	 */
-	public void setStormTopology(StormTopology stormTopology) {
-		this.stormTopology = stormTopology;
-	}
-
 	@Override
 	public final void run(final SourceContext<OUT> ctx) throws Exception {
 		final GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig()
@@ -255,7 +242,7 @@ public final void run(final SourceContext<OUT> ctx) throws Exception {
 
 		final TopologyContext stormTopologyContext = WrapperSetupHelper.createTopologyContext(
 				(StreamingRuntimeContext) super.getRuntimeContext(), this.spout, this.name,
-				this.stormTopology, stormConfig);
+				stormConfig);
 
 		SpoutCollector<OUT> collector = new SpoutCollector<OUT>(this.numberOfAttributes,
 				stormTopologyContext.getThisTaskId(), ctx);
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
index 1d3a544a830..03af852ac53 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
@@ -26,7 +26,6 @@
 import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StateSpoutSpec;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.generated.StreamInfo;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.IComponent;
 import org.apache.storm.topology.IRichBolt;
@@ -93,9 +92,6 @@
 		return declarer.outputSchemas;
 	}
 
-	/** Used to compute unique task IDs for a Storm topology. */
-	private static int tid;
-
 	/**
 	 * Creates a {@link TopologyContext} for a Spout or Bolt instance (ie, Flink task / Storm executor).
 	 *
@@ -103,8 +99,6 @@
 	 *            The Flink runtime context.
 	 * @param spoutOrBolt
 	 *            The Spout or Bolt this context is created for.
-	 * @param stormTopology
-	 *            The original Storm topology.
 	 * @param stormConfig
 	 *            The user provided configuration.
 	 * @return The created {@link TopologyContext}.
@@ -112,7 +106,7 @@
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	static synchronized TopologyContext createTopologyContext(
 			final StreamingRuntimeContext context, final IComponent spoutOrBolt,
-			final String operatorName, StormTopology stormTopology, final Map stormConfig) {
+			final String operatorName, final Map stormConfig) {
 
 		final int dop = context.getNumberOfParallelSubtasks();
 
@@ -131,66 +125,29 @@ static synchronized TopologyContext createTopologyContext(
 		final Map registeredMetrics = new HashMap();
 		Atom openOrPrepareWasCalled = null;
 
-		if (stormTopology == null) {
-			// embedded mode
-			ComponentCommon common = new ComponentCommon();
-			common.set_parallelism_hint(dop);
-
-			HashMap<String, SpoutSpec> spouts = new HashMap<String, SpoutSpec>();
-			HashMap<String, Bolt> bolts = new HashMap<String, Bolt>();
-			if (spoutOrBolt instanceof IRichSpout) {
-				spouts.put(operatorName, new SpoutSpec(null, common));
-			} else {
-				assert (spoutOrBolt instanceof IRichBolt);
-				bolts.put(operatorName, new Bolt(null, common));
-			}
-			stormTopology = new StormTopology(spouts, bolts, new HashMap<String, StateSpoutSpec>());
-
-			List<Integer> sortedTasks = new ArrayList<Integer>(dop);
-			for (int i = 1; i <= dop; ++i) {
-				taskToComponents.put(i, operatorName);
-				sortedTasks.add(i);
-			}
-			componentToSortedTasks.put(operatorName, sortedTasks);
+		ComponentCommon common = new ComponentCommon();
+		common.set_parallelism_hint(dop);
 
-			SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
-			spoutOrBolt.declareOutputFields(declarer);
-			componentToStreamToFields.put(operatorName, declarer.outputStreams);
+		HashMap<String, SpoutSpec> spouts = new HashMap<String, SpoutSpec>();
+		HashMap<String, Bolt> bolts = new HashMap<String, Bolt>();
+		if (spoutOrBolt instanceof IRichSpout) {
+			spouts.put(operatorName, new SpoutSpec(null, common));
 		} else {
-			// whole topology is built (i.e. FlinkTopology is used)
-			Map<String, SpoutSpec> spouts = stormTopology.get_spouts();
-			Map<String, Bolt> bolts = stormTopology.get_bolts();
-			Map<String, StateSpoutSpec> stateSpouts = stormTopology.get_state_spouts();
-
-			tid = 1;
-
-			for (Entry<String, SpoutSpec> spout : spouts.entrySet()) {
-				Integer rc = processSingleOperator(spout.getKey(), spout.getValue().get_common(),
-						operatorName, context.getIndexOfThisSubtask(), dop, taskToComponents,
-						componentToSortedTasks, componentToStreamToFields);
-				if (rc != null) {
-					taskId = rc;
-				}
-			}
-			for (Entry<String, Bolt> bolt : bolts.entrySet()) {
-				Integer rc = processSingleOperator(bolt.getKey(), bolt.getValue().get_common(),
-						operatorName, context.getIndexOfThisSubtask(), dop, taskToComponents,
-						componentToSortedTasks, componentToStreamToFields);
-				if (rc != null) {
-					taskId = rc;
-				}
-			}
-			for (Entry<String, StateSpoutSpec> stateSpout : stateSpouts.entrySet()) {
-				Integer rc = processSingleOperator(stateSpout.getKey(), stateSpout
-						.getValue().get_common(), operatorName, context.getIndexOfThisSubtask(),
-						dop, taskToComponents, componentToSortedTasks, componentToStreamToFields);
-				if (rc != null) {
-					taskId = rc;
-				}
-			}
+			assert (spoutOrBolt instanceof IRichBolt);
+			bolts.put(operatorName, new Bolt(null, common));
+		}
+		StormTopology stormTopology = new StormTopology(spouts, bolts, new HashMap<String, StateSpoutSpec>());
 
-			assert(taskId != null);
+		List<Integer> sortedTasks = new ArrayList<Integer>(dop);
+		for (int i = 1; i <= dop; ++i) {
+			taskToComponents.put(i, operatorName);
+			sortedTasks.add(i);
 		}
+		componentToSortedTasks.put(operatorName, sortedTasks);
+
+		SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+		spoutOrBolt.declareOutputFields(declarer);
+		componentToStreamToFields.put(operatorName, declarer.outputStreams);
 
 		if (!stormConfig.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
 			stormConfig.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 30); // Storm default value
@@ -201,58 +158,4 @@ static synchronized TopologyContext createTopologyContext(
 				taskId, workerPort, workerTasks, defaultResources, userResources, executorData,
 				registeredMetrics, openOrPrepareWasCalled);
 	}
-
-	/**
-	 * Sets up {@code taskToComponents}, {@code componentToSortedTasks}, and {@code componentToStreamToFields} for a
-	 * single instance of a Spout or Bolt (ie, task or executor). Furthermore, is computes the unique task-id.
-	 *
-	 * @param componentId
-	 *            The ID of the Spout/Bolt in the topology.
-	 * @param common
-	 *            The common operator object (that is all Spouts and Bolts have).
-	 * @param operatorName
-	 *            The Flink operator name.
-	 * @param index
-	 *            The index of the currently processed tasks with its operator.
-	 * @param dop
-	 *            The parallelism of the operator.
-	 * @param taskToComponents
-	 *            OUTPUT: A map from all task IDs of the topology to their component IDs.
-	 * @param componentToSortedTasks
-	 *            OUTPUT: A map from all component IDs to their sorted list of corresponding task IDs.
-	 * @param componentToStreamToFields
-	 *            OUTPUT: A map from all component IDs to there output streams and output fields.
-	 *
-	 * @return A unique task ID if the currently processed Spout or Bolt ({@code componentId}) is equal to the current
-	 *         Flink operator {@code operatorName} -- {@code null} otherwise.
-	 */
-	private static Integer processSingleOperator(final String componentId,
-			final ComponentCommon common, final String operatorName, final int index,
-			final int dop, final Map<Integer, String> taskToComponents,
-			final Map<String, List<Integer>> componentToSortedTasks,
-			final Map<String, Map<String, Fields>> componentToStreamToFields) {
-		final int parallelismHint = common.get_parallelism_hint();
-		Integer taskId = null;
-
-		if (componentId.equals(operatorName)) {
-			taskId = tid + index;
-		}
-
-		List<Integer> sortedTasks = new ArrayList<Integer>(dop);
-		for (int i = 0; i < parallelismHint; ++i) {
-			taskToComponents.put(tid, componentId);
-			sortedTasks.add(tid);
-			++tid;
-		}
-		componentToSortedTasks.put(componentId, sortedTasks);
-
-		Map<String, Fields> outputStreams = new HashMap<String, Fields>();
-		for (Entry<String, StreamInfo> outStream : common.get_streams().entrySet()) {
-			outputStreams.put(outStream.getKey(), new Fields(outStream.getValue().get_output_fields()));
-		}
-		componentToStreamToFields.put(componentId, outputStreams);
-
-		return taskId;
-	}
-
 }
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
deleted file mode 100644
index d035bb2784d..00000000000
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.api;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.storm.util.AbstractTest;
-
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.LinkedList;
-
-/**
- * Tests for the FlinkOutputFieldsDeclarer.
- */
-public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
-
-	@Test
-	public void testNull() {
-		Assert.assertNull(new FlinkOutputFieldsDeclarer().getOutputType(null));
-	}
-
-	@Test
-	public void testDeclare() {
-		for (int i = 0; i < 2; ++i) { // test case: simple / non-direct
-			for (int j = 1; j < 2; ++j) { // number of streams
-				for (int k = 0; k <= 24; ++k) { // number of attributes
-					this.runDeclareTest(i, j, k);
-				}
-			}
-		}
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testDeclareSimpleToManyAttributes() {
-		this.runDeclareTest(0, this.r.nextBoolean() ? 1 : 2, 25);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testDeclareNonDirectToManyAttributes() {
-		this.runDeclareTest(1, this.r.nextBoolean() ? 1 : 2, 25);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testDeclareDefaultStreamToManyAttributes() {
-		this.runDeclareTest(2, this.r.nextBoolean() ? 1 : 2, 25);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testDeclareFullToManyAttributes() {
-		this.runDeclareTest(3, this.r.nextBoolean() ? 1 : 2, 25);
-	}
-
-	private void runDeclareTest(final int testCase, final int numberOfStreams,
-			final int numberOfAttributes) {
-		final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
-
-		String[] streams = null;
-		if (numberOfStreams > 1 || r.nextBoolean()) {
-			streams = new String[numberOfStreams];
-			for (int i = 0; i < numberOfStreams; ++i) {
-				streams[i] = "stream" + i;
-			}
-		}
-
-		final String[] attributes = new String[numberOfAttributes];
-		for (int i = 0; i < attributes.length; ++i) {
-			attributes[i] = "a" + i;
-		}
-
-		switch (testCase) {
-		case 0:
-			this.declareSimple(declarer, streams, attributes);
-			break;
-		default:
-			this.declareNonDirect(declarer, streams, attributes);
-		}
-
-		if (streams == null) {
-			streams = new String[] { Utils.DEFAULT_STREAM_ID };
-		}
-
-		for (String stream : streams) {
-			final TypeInformation<?> type = declarer.getOutputType(stream);
-
-			Assert.assertEquals(numberOfAttributes + 1, type.getArity());
-			Assert.assertTrue(type.isTupleType());
-		}
-	}
-
-	private void declareSimple(final FlinkOutputFieldsDeclarer declarer, final String[] streams,
-			final String[] attributes) {
-
-		if (streams != null) {
-			for (String stream : streams) {
-				declarer.declareStream(stream, new Fields(attributes));
-			}
-		} else {
-			declarer.declare(new Fields(attributes));
-		}
-	}
-
-	private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, final String[] streams,
-			final String[] attributes) {
-
-		if (streams != null) {
-			for (String stream : streams) {
-				declarer.declareStream(stream, false, new Fields(attributes));
-			}
-		} else {
-			declarer.declare(false, new Fields(attributes));
-		}
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testUndeclared() {
-		final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
-		declarer.getOutputType("unknownStreamId");
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testDeclareDirect() {
-		new FlinkOutputFieldsDeclarer().declare(true, null);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testDeclareDirect2() {
-		new FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
-	}
-
-	@Test
-	public void testGetGroupingFieldIndexes() {
-		final int numberOfAttributes = 5 + this.r.nextInt(20);
-		final String[] attributes = new String[numberOfAttributes];
-		for (int i = 0; i < numberOfAttributes; ++i) {
-			attributes[i] = "a" + i;
-		}
-
-		final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
-		declarer.declare(new Fields(attributes));
-
-		final int numberOfKeys = 1 + this.r.nextInt(24);
-		final LinkedList<String> groupingFields = new LinkedList<String>();
-		final boolean[] indexes = new boolean[numberOfAttributes];
-
-		for (int i = 0; i < numberOfAttributes; ++i) {
-			if (this.r.nextInt(25) < numberOfKeys) {
-				groupingFields.add(attributes[i]);
-				indexes[i] = true;
-			} else {
-				indexes[i] = false;
-			}
-		}
-
-		final int[] expectedResult = new int[groupingFields.size()];
-		int j = 0;
-		for (int i = 0; i < numberOfAttributes; ++i) {
-			if (indexes[i]) {
-				expectedResult[j++] = i;
-			}
-		}
-
-		final int[] result = declarer.getGroupingFieldIndexes(Utils.DEFAULT_STREAM_ID,
-				groupingFields);
-
-		Assert.assertEquals(expectedResult.length, result.length);
-		for (int i = 0; i < expectedResult.length; ++i) {
-			Assert.assertEquals(expectedResult[i], result[i]);
-		}
-	}
-
-}
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
deleted file mode 100644
index aaecc061b18..00000000000
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.api;
-
-import org.apache.flink.storm.util.TestDummyBolt;
-import org.apache.flink.storm.util.TestDummySpout;
-import org.apache.flink.storm.util.TestSink;
-
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for the FlinkTopology.
- */
-public class FlinkTopologyTest {
-
-	@Test
-	public void testDefaultParallelism() {
-		final TopologyBuilder builder = new TopologyBuilder();
-		final FlinkTopology flinkTopology = FlinkTopology.createTopology(builder);
-		Assert.assertEquals(1, flinkTopology.getExecutionEnvironment().getParallelism());
-	}
-
-	@Test(expected = RuntimeException.class)
-	public void testUnknowSpout() {
-		TopologyBuilder builder = new TopologyBuilder();
-		builder.setSpout("spout", new TestSpout());
-		builder.setBolt("bolt", new TestBolt()).shuffleGrouping("unknown");
-
-		FlinkTopology.createTopology(builder);
-	}
-
-	@Test(expected = RuntimeException.class)
-	public void testUnknowBolt() {
-		TopologyBuilder builder = new TopologyBuilder();
-		builder.setSpout("spout", new TestSpout());
-		builder.setBolt("bolt1", new TestBolt()).shuffleGrouping("spout");
-		builder.setBolt("bolt2", new TestBolt()).shuffleGrouping("unknown");
-
-		FlinkTopology.createTopology(builder);
-	}
-
-	@Test(expected = RuntimeException.class)
-	public void testUndeclaredStream() {
-		TopologyBuilder builder = new TopologyBuilder();
-		builder.setSpout("spout", new TestSpout());
-		builder.setBolt("bolt", new TestBolt()).shuffleGrouping("spout");
-
-		FlinkTopology.createTopology(builder);
-	}
-
-	@Test
-	public void testFieldsGroupingOnMultipleSpoutOutputStreams() {
-		TopologyBuilder builder = new TopologyBuilder();
-
-		builder.setSpout("spout", new TestDummySpout());
-		builder.setBolt("sink", new TestSink()).fieldsGrouping("spout",
-				TestDummySpout.SPOUT_STREAM_ID, new Fields("id"));
-
-		FlinkTopology.createTopology(builder);
-	}
-
-	@Test
-	public void testFieldsGroupingOnMultipleBoltOutputStreams() {
-		TopologyBuilder builder = new TopologyBuilder();
-
-		builder.setSpout("spout", new TestDummySpout());
-		builder.setBolt("bolt", new TestDummyBolt()).shuffleGrouping("spout");
-		builder.setBolt("sink", new TestSink()).fieldsGrouping("bolt",
-				TestDummyBolt.GROUPING_STREAM_ID, new Fields("id"));
-
-		FlinkTopology.createTopology(builder);
-	}
-
-}
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
deleted file mode 100644
index 001e9c4a542..00000000000
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.api;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-
-import java.util.Map;
-
-/**
- * A no-op test implementation of a {@link IRichBolt}.
- */
-public class TestBolt implements IRichBolt {
-	private static final long serialVersionUID = -667148827441397683L;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}
-
-	@Override
-	public void execute(Tuple input) {}
-
-	@Override
-	public void cleanup() {}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-}
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
deleted file mode 100644
index 3466ff40971..00000000000
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.api;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-
-import java.util.Map;
-
-/**
- * A no-op test implementation of a {@link IRichSpout}.
- */
-public class TestSpout implements IRichSpout {
-	private static final long serialVersionUID = -4884029383198924007L;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
-
-	@Override
-	public void close() {}
-
-	@Override
-	public void activate() {}
-
-	@Override
-	public void deactivate() {}
-
-	@Override
-	public void nextTuple() {}
-
-	@Override
-	public void ack(Object msgId) {}
-
-	@Override
-	public void fail(Object msgId) {}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-}
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java
deleted file mode 100644
index 2b0b2753f2c..00000000000
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.wrappers;
-
-import org.apache.flink.storm.api.FlinkTopology;
-import org.apache.flink.storm.util.AbstractTest;
-import org.apache.flink.storm.util.TestDummyBolt;
-import org.apache.flink.storm.util.TestDummySpout;
-import org.apache.flink.storm.util.TestSink;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.ComponentCommon;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IComponent;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the setup of wrappers in a local cluster.
- */
-public class WrapperSetupInLocalClusterTest extends AbstractTest {
-
-	@Test
-	public void testCreateTopologyContext() {
-		HashMap<String, Integer> dops = new HashMap<String, Integer>();
-		dops.put("spout1", 1);
-		dops.put("spout2", 3);
-		dops.put("bolt1", 1);
-		dops.put("bolt2", 2);
-		dops.put("sink", 1);
-
-		HashMap<String, Integer> taskCounter = new HashMap<String, Integer>();
-		taskCounter.put("spout1", 0);
-		taskCounter.put("spout2", 0);
-		taskCounter.put("bolt1", 0);
-		taskCounter.put("bolt2", 0);
-		taskCounter.put("sink", 0);
-
-		HashMap<String, IComponent> operators = new HashMap<String, IComponent>();
-		operators.put("spout1", new TestDummySpout());
-		operators.put("spout2", new TestDummySpout());
-		operators.put("bolt1", new TestDummyBolt());
-		operators.put("bolt2", new TestDummyBolt());
-		operators.put("sink", new TestSink());
-
-		TopologyBuilder builder = new TopologyBuilder();
-
-		builder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1"));
-		builder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2"));
-		builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
-		builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
-		builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
-				.shuffleGrouping("bolt1", TestDummyBolt.GROUPING_STREAM_ID)
-				.shuffleGrouping("bolt1", TestDummyBolt.SHUFFLE_STREAM_ID)
-				.shuffleGrouping("bolt2", TestDummyBolt.GROUPING_STREAM_ID)
-				.shuffleGrouping("bolt2", TestDummyBolt.SHUFFLE_STREAM_ID);
-
-		LocalCluster cluster = new LocalCluster();
-		Config c = new Config();
-		c.setNumAckers(0);
-		cluster.submitTopology("test", c, builder.createTopology());
-
-		while (TestSink.RESULT.size() != 8) {
-			Utils.sleep(100);
-		}
-		cluster.shutdown();
-		final FlinkTopology flinkBuilder = FlinkTopology.createTopology(builder);
-		StormTopology stormTopology = flinkBuilder.getStormTopology();
-
-		Set<Integer> taskIds = new HashSet<Integer>();
-
-		for (TopologyContext expectedContext : TestSink.RESULT) {
-			final String thisComponentId = expectedContext.getThisComponentId();
-			int index = taskCounter.get(thisComponentId);
-
-			StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
-			when(context.getTaskName()).thenReturn(thisComponentId);
-			when(context.getNumberOfParallelSubtasks()).thenReturn(dops.get(thisComponentId));
-			when(context.getIndexOfThisSubtask()).thenReturn(index);
-			taskCounter.put(thisComponentId, ++index);
-
-			Config stormConfig = new Config();
-			stormConfig.put(WrapperSetupHelper.TOPOLOGY_NAME, "test");
-
-			TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext(context,
-				operators.get(thisComponentId), thisComponentId, stormTopology, stormConfig);
-
-			ComponentCommon expcetedCommon = expectedContext.getComponentCommon(thisComponentId);
-			ComponentCommon common = topologyContext.getComponentCommon(thisComponentId);
-
-			Assert.assertNull(topologyContext.getCodeDir());
-			Assert.assertNull(common.get_json_conf());
-			Assert.assertNull(topologyContext.getExecutorData(null));
-			Assert.assertNull(topologyContext.getPIDDir());
-			Assert.assertNull(topologyContext.getResource(null));
-			Assert.assertNull(topologyContext.getSharedExecutor());
-			Assert.assertNull(expectedContext.getTaskData(null));
-			Assert.assertNull(topologyContext.getThisWorkerPort());
-
-			Assert.assertTrue(expectedContext.getStormId().startsWith(topologyContext.getStormId()));
-
-			Assert.assertEquals(expcetedCommon.get_inputs(), common.get_inputs());
-			Assert.assertEquals(expcetedCommon.get_parallelism_hint(), common.get_parallelism_hint());
-			Assert.assertEquals(expcetedCommon.get_streams(), common.get_streams());
-			Assert.assertEquals(expectedContext.getComponentIds(), topologyContext.getComponentIds());
-			Assert.assertEquals(expectedContext.getComponentStreams(thisComponentId),
-				topologyContext.getComponentStreams(thisComponentId));
-			Assert.assertEquals(thisComponentId, topologyContext.getThisComponentId());
-			Assert.assertEquals(expectedContext.getThisSources(), topologyContext.getThisSources());
-			Assert.assertEquals(expectedContext.getThisStreams(), topologyContext.getThisStreams());
-			Assert.assertEquals(expectedContext.getThisTargets(), topologyContext.getThisTargets());
-			Assert.assertEquals(0, topologyContext.getThisWorkerTasks().size());
-
-			for (int taskId : topologyContext.getComponentTasks(thisComponentId)) {
-				Assert.assertEquals(thisComponentId, topologyContext.getComponentId(taskId));
-			}
-
-			for (String componentId : expectedContext.getComponentIds()) {
-				Assert.assertEquals(expectedContext.getSources(componentId),
-					topologyContext.getSources(componentId));
-				Assert.assertEquals(expectedContext.getTargets(componentId),
-					topologyContext.getTargets(componentId));
-
-				for (String streamId : expectedContext.getComponentStreams(componentId)) {
-					Assert.assertEquals(
-						expectedContext.getComponentOutputFields(componentId, streamId).toList(),
-						topologyContext.getComponentOutputFields(componentId, streamId).toList());
-				}
-			}
-
-			for (String streamId : expectedContext.getThisStreams()) {
-				Assert.assertEquals(expectedContext.getThisOutputFields(streamId).toList(),
-					topologyContext.getThisOutputFields(streamId).toList());
-			}
-
-			HashMap<Integer, String> taskToComponents = new HashMap<Integer, String>();
-			Set<Integer> allTaskIds = new HashSet<Integer>();
-			for (String componentId : expectedContext.getComponentIds()) {
-				List<Integer> possibleTasks = expectedContext.getComponentTasks(componentId);
-				List<Integer> tasks = topologyContext.getComponentTasks(componentId);
-
-				Iterator<Integer> pIt = possibleTasks.iterator();
-				Iterator<Integer> tIt = tasks.iterator();
-				while (pIt.hasNext()) {
-					Assert.assertTrue(tIt.hasNext());
-					Assert.assertNull(taskToComponents.put(pIt.next(), componentId));
-					Assert.assertTrue(allTaskIds.add(tIt.next()));
-				}
-				Assert.assertFalse(tIt.hasNext());
-			}
-
-			Assert.assertEquals(taskToComponents, expectedContext.getTaskToComponent());
-			Assert.assertTrue(taskIds.add(topologyContext.getThisTaskId()));
-
-			try {
-				topologyContext.getHooks();
-				Assert.fail();
-			} catch (UnsupportedOperationException e) { /* expected */ }
-
-			try {
-				topologyContext.getRegisteredMetricByName(null);
-				Assert.fail();
-			} catch (UnsupportedOperationException e) { /* expected */ }
-		}
-	}
-
-}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services