You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:34 UTC

[09/15] flink git commit: [Storm Compatibility] Maven module restucturing and cleanup - removed storm-parent; renamed storm-core and storm-examples - updated internal Java package structure * renamed package "stormcompatibility" to "storm" *

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
deleted file mode 100644
index 2f3c02d..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
+++ /dev/null
@@ -1,362 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-storm-compatibility-parent</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-storm-compatibility-examples</artifactId>
-	<name>flink-storm-compatibility-examples</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-storm-compatibility-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java-examples</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-core</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<!-- get default data from flink-java-examples package -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-dependency-plugin</artifactId>
-				<version>2.9</version><!--$NO-MVN-MAN-VER$-->
-				<executions>
-					<execution>
-						<id>unpack</id>
-						<phase>prepare-package</phase>
-						<goals>
-							<goal>unpack</goal>
-						</goals>
-						<configuration>
-							<artifactItems>
-								<artifactItem>
-									<groupId>org.apache.flink</groupId>
-									<artifactId>flink-java-examples</artifactId>
-									<version>${project.version}</version>
-									<type>jar</type>
-									<overWrite>false</overWrite>
-									<outputDirectory>${project.build.directory}/classes</outputDirectory>
-									<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
-								</artifactItem>
-								<artifactItem>
-									<groupId>org.apache.flink</groupId>
-									<artifactId>flink-storm-compatibility-core</artifactId>
-									<version>${project.version}</version>
-									<type>jar</type>
-									<overWrite>false</overWrite>
-									<outputDirectory>${project.build.directory}/classes</outputDirectory>
-								</artifactItem>
-								<artifactItem>
-									<groupId>org.apache.storm</groupId>
-									<artifactId>storm-core</artifactId>
-									<version>0.9.4</version>
-									<type>jar</type>
-									<overWrite>false</overWrite>
-									<outputDirectory>${project.build.directory}/classes</outputDirectory>
-									<!-- need to exclude to be able to run
-									       * StormWordCountRemoteByClient and
-									       * StormWordCountRemoteBySubmitter
-									     within Eclipse -->
-									<excludes>defaults.yaml</excludes>
-								</artifactItem>
-								<artifactItem>
-									<groupId>com.googlecode.json-simple</groupId>
-									<artifactId>json-simple</artifactId>
-									<version>1.1</version>
-									<type>jar</type>
-									<overWrite>false</overWrite>
-									<outputDirectory>${project.build.directory}/classes</outputDirectory>
-								</artifactItem>
-								<artifactItem>
-									<groupId>org.yaml</groupId>
-									<artifactId>snakeyaml</artifactId>
-									<version>1.11</version>
-									<type>jar</type>
-									<overWrite>false</overWrite>
-									<outputDirectory>${project.build.directory}/classes</outputDirectory>
-								</artifactItem>
-							</artifactItems>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!-- self-contained jars for each example -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-
-				<executions>
-
-					<!-- WordCount Spout source-->
-					<!-- example for embedded spout - for whole topologies see "WordCount Storm topology" example below -->
-					<execution>
-						<id>WordCount-SpoutSource</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<finalName>WordCount</finalName>
-							<classifier>SpoutSource</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.stormcompatibility.wordcount.SpoutSourceWordCount</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<!-- from storm-core -->
-								<include>backtype/storm/topology/*.class</include>
-								<include>backtype/storm/spout/*.class</include>
-								<include>backtype/storm/task/*.class</include>
-								<include>backtype/storm/tuple/*.class</include>
-								<include>backtype/storm/generated/*.class</include>
-								<include>backtype/storm/metric/**/*.class</include>
-								<include>org/apache/thrift7/**/*.class</include>
-								<!-- Storm's recursive dependencies -->
-								<include>org/json/simple/**/*.class</include>
-								<!-- compatibility layer -->
-								<include>org/apache/flink/stormcompatibility/api/*.class</include>
-								<include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
-								<!-- Word Count -->
-								<include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.class</include>
-								<include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount$*.class</include>
-								<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.class</include>
-								<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.class</include>
-								<include>org/apache/flink/stormcompatibility/util/AbstractStormSpout.class</include>
-								<include>org/apache/flink/stormcompatibility/util/StormFileSpout.class</include>
-								<include>org/apache/flink/stormcompatibility/util/StormInMemorySpout.class</include>
-								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- WordCount Bolt tokenizer-->
-					<!-- example for embedded bolt - for whole topologies see "WordCount Storm topology" example below -->
-					<execution>
-						<id>WordCount-BoltTokenizer</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<finalName>WordCount</finalName>
-							<classifier>BoltTokenizer</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.stormcompatibility.wordcount.BoltTokenizerWordCount</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<!-- from storm-core -->
-								<include>backtype/storm/topology/*.class</include>
-								<include>backtype/storm/spout/*.class</include>
-								<include>backtype/storm/task/*.class</include>
-								<include>backtype/storm/tuple/*.class</include>
-								<include>backtype/storm/generated/*.class</include>
-								<include>backtype/storm/metric/**/*.class</include>
-								<include>org/apache/thrift7/**/*.class</include>
-								<!-- Storm's recursive dependencies -->
-								<include>org/json/simple/**/*.class</include>
-								<!-- compatibility layer -->
-								<include>org/apache/flink/stormcompatibility/api/*.class</include>
-								<include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
-								<!-- Word Count -->
-								<include>org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.class</include>
-								<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.class</include>
-								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- WordCount Storm topology-->
-					<!-- Example for whole topologies (ie, if FlinkTopologyBuilder is used) -->
-					<!-- We cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar.
-					     However, we excluded 'defaults.yaml' in dependency-plugin to get clean Eclipse environment.
-					     Thus, 'defaults.yaml' is not available for maven-jar-plugin.
-					     Nevertheless, we register an empty jar with corresponding name, such that the final jar can be installed to local maven repository.
-					     We use maven-shade-plugin to build the actual jar (which will replace the empty jar). -->
-					<execution>
-						<id>WordCount-StormTopology</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<finalName>WordCount</finalName>
-							<classifier>StormTopology</classifier>
-						</configuration>
-					</execution>
-
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!-- WordCount Storm topology-->
-			<!-- Cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar -->
-			<!-- Build StormTopolgy jar to overwrite empty jar created with maven-jar-plugin. -->
-			<plugin>
-				<artifactId>maven-shade-plugin</artifactId>
-				<groupId>org.apache.maven.plugins</groupId>
-				<version>2.4.1</version>
-				<executions>
-					<execution>
-						<id>WordCount-StormTopology</id>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<finalName>WordCount-StormTopology</finalName>
-
-							<artifactSet>
-								<includes>
-									<include>org.apache.storm:storm-core</include>
-									<!-- Storm's recursive dependencies -->
-									<include>org.yaml:snakeyaml</include>
-									<include>com.googlecode.json-simple:json-simple</include>
-									<include>org.apache.flink:flink-storm-compatibility-core</include>
-									<include>org.apache.flink:flink-storm-compatibility-examples</include>
-								</includes>
-							</artifactSet>
-							<filters>
-								<filter>
-									<artifact>org.apache.storm:storm-core</artifact>
-									<includes>
-										<include>defaults.yaml</include>
-										<include>backtype/storm/*.class</include>
-										<include>backtype/storm/topology/*.class</include>
-										<include>backtype/storm/spout/*.class</include>
-										<include>backtype/storm/task/*.class</include>
-										<include>backtype/storm/tuple/*.class</include>
-										<include>backtype/storm/generated/*.class</include>
-										<include>backtype/storm/metric/**/*.class</include>
-										<include>backtype/storm/utils/*.class</include>
-										<include>backtype/storm/serialization/*.class</include>
-										<include>org/apache/storm/curator/**/*.class</include>
-										<include>org/apache/thrift7/**/*.class</include>
-										<!-- Storm's recursive dependencies -->
-										<include>org/json/simple/**/*.class</include>
-										<include>org/yaml/snakeyaml/**/*.class</include>
-									</includes>
-								</filter>
-								<filter>
-									<artifact>org.apache.flink:flink-storm-compatibility-examples</artifact>
-									<includes>
-										<include>org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.class</include>
-										<include>org/apache/flink/stormcompatibility/wordcount/WordCountTopology.class</include>
-										<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/*.class</include>
-										<include>org/apache/flink/stormcompatibility/util/*.class</include>
-										<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
-									</includes>
-								</filter>
-								<filter>
-									<artifact>org.apache.flink:flink-storm-compatibility-core</artifact>
-									<includes>
-										<include>org/apache/flink/stormcompatibility/api/*.class</include>
-										<include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
-									</includes>
-								</filter>
-							</filters>
-							<transformers>
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-									<mainClass>org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter</mainClass>
-								</transformer>
-							</transformers>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-
-		<pluginManagement>
-			<plugins>
-				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
-				<plugin>
-					<groupId>org.eclipse.m2e</groupId>
-					<artifactId>lifecycle-mapping</artifactId>
-					<version>1.0.0</version>
-					<configuration>
-						<lifecycleMappingMetadata>
-							<pluginExecutions>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>org.apache.maven.plugins</groupId>
-										<artifactId>maven-dependency-plugin</artifactId>
-										<versionRange>[2.9,)</versionRange>
-										<goals>
-											<goal>unpack</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-							</pluginExecutions>
-						</lifecycleMappingMetadata>
-					</configuration>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-
-	</build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
deleted file mode 100644
index d8d620b..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
-import org.apache.flink.stormcompatibility.util.FiniteStormFileSpout;
-import org.apache.flink.stormcompatibility.util.FiniteStormInMemorySpout;
-import org.apache.flink.stormcompatibility.util.OutputFormatter;
-import org.apache.flink.stormcompatibility.util.SimpleOutputFormatter;
-import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
-import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
-
-/**
- * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
- * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>StormExclamation[Local|RemoteByClient|RemoteBySubmitter] &lt;text path&gt;
- * &lt;result path&gt;</code><br/>
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>construct a regular Storm topology as Flink program</li>
- * <li>make use of the FiniteStormSpout interface</li>
- * </ul>
- */
-public class ExclamationTopology {
-
-	public final static String spoutId = "source";
-	public final static String firstBoltId = "exclamation1";
-	public final static String secondBoltId = "exclamation2";
-	public final static String sinkId = "sink";
-	private final static OutputFormatter formatter = new SimpleOutputFormatter();
-
-	public static FlinkTopologyBuilder buildTopology() {
-		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
-
-		// get input data
-		if (fileInputOutput) {
-			// read the text file from given input path
-			final String[] tokens = textPath.split(":");
-			final String inputFile = tokens[tokens.length - 1];
-			builder.setSpout(spoutId, new FiniteStormFileSpout(inputFile));
-		} else {
-			builder.setSpout(spoutId, new FiniteStormInMemorySpout(WordCountData.WORDS));
-		}
-
-		builder.setBolt(firstBoltId, new ExclamationBolt(), 3).shuffleGrouping(spoutId);
-		builder.setBolt(secondBoltId, new ExclamationBolt(), 2).shuffleGrouping(firstBoltId);
-
-		// emit result
-		if (fileInputOutput) {
-			// read the text file from given input path
-			final String[] tokens = outputPath.split(":");
-			final String outputFile = tokens[tokens.length - 1];
-			builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter))
-					.shuffleGrouping(secondBoltId);
-		} else {
-			builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4)
-					.shuffleGrouping(secondBoltId);
-		}
-
-		return builder;
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileInputOutput = false;
-	private static String textPath;
-	private static String outputPath;
-	private static int exclamationNum = 3;
-
-	static int getExclamation() {
-		return exclamationNum;
-	}
-
-	static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileInputOutput = true;
-			if (args.length == 3) {
-				textPath = args[0];
-				outputPath = args[1];
-				exclamationNum = Integer.parseInt(args[2]);
-			} else {
-				System.err.println(
-						"Usage: StormExclamation[Local|RemoteByClient|RemoteBySubmitter] <text " +
-						"path> <result path>  <number of exclamation marks>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing StormExclamation example with built-in default data");
-			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println(
-					"  Usage: StormExclamation[Local|RemoteByClient|RemoteBySubmitter] <text path>" +
-					" <result path> <number of exclamation marks>");
-		}
-
-		return true;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
deleted file mode 100644
index c8af3a6..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import backtype.storm.utils.Utils;
-
-/**
- * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
- * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>StormExclamationWithStormBolt &lt;text path&gt; &lt;result path&gt; &lt;number of exclamation marks&gt;</code><br/>
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>use a Storm bolt within a Flink Streaming program</li>
- * </ul>
- */
-public class ExclamationWithStormBolt {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// set Storm configuration
-		StormConfig config = new StormConfig();
-		config.put(ExclamationBolt.EXCLAMATION_COUNT, new Integer(exclamationNum));
-		env.getConfig().setGlobalJobParameters(config);
-
-		// get input data
-		final DataStream<String> text = getTextDataStream(env);
-
-		final DataStream<String> exclaimed = text
-				.transform("StormBoltTokenizer",
-						TypeExtractor.getForObject(""),
-						new StormBoltWrapper<String, String>(new ExclamationBolt(),
-								new String[] { Utils.DEFAULT_STREAM_ID }))
-				.map(new ExclamationMap());
-
-		// emit result
-		if (fileOutput) {
-			exclaimed.writeAsText(outputPath);
-		} else {
-			exclaimed.print();
-		}
-
-		// execute program
-		env.execute("Streaming WordCount with Storm bolt tokenizer");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	private static class ExclamationMap implements MapFunction<String, String> {
-		private static final long serialVersionUID = 4614754344067170619L;
-
-		@Override
-		public String map(String value) throws Exception {
-			return value + "!!!";
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-	private static int exclamationNum = 3;
-
-	private static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 3) {
-				textPath = args[0];
-				outputPath = args[1];
-				exclamationNum = Integer.parseInt(args[2]);
-			} else {
-				System.err.println("Usage: ExclamationWithStormBolt <text path> <result path> <number of exclamation marks>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing ExclamationWithStormBolt example with built-in default data");
-			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: ExclamationWithStormBolt <text path> <result path> <number of exclamation marks>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		}
-
-		return env.fromElements(WordCountData.WORDS);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
deleted file mode 100644
index 99c816d..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.util.FiniteStormFileSpout;
-import org.apache.flink.stormcompatibility.util.FiniteStormInMemorySpout;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import backtype.storm.utils.Utils;
-
-/**
- * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
- * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>StormExclamationWithStormSpout &lt;text path&gt; &lt;result path&gt;</code><br/>
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>use a Storm spout within a Flink Streaming program</li>
- * <li>make use of the FiniteStormSpout interface</li>
- * </ul>
- */
-public class ExclamationWithStormSpout {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		final DataStream<String> text = getTextDataStream(env);
-
-		final DataStream<String> exclaimed = text
-				.map(new ExclamationMap())
-				.map(new ExclamationMap());
-
-		// emit result
-		if (fileOutput) {
-			exclaimed.writeAsText(outputPath);
-		} else {
-			exclaimed.print();
-		}
-
-		// execute program
-		env.execute("Streaming Exclamation with Storm spout source");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	private static class ExclamationMap implements MapFunction<String, String> {
-		private static final long serialVersionUID = -684993133807698042L;
-
-		@Override
-		public String map(String value) throws Exception {
-			return value + "!!!";
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: ExclamationWithStormSpout <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing ExclamationWithStormSpout example with built-in default " +
-					"data");
-			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: ExclamationWithStormSpout <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			final String[] tokens = textPath.split(":");
-			final String inputFile = tokens[tokens.length - 1];
-
-			// set Storm configuration
-			StormConfig config = new StormConfig();
-			config.put(FiniteStormFileSpout.INPUT_FILE_PATH, inputFile);
-			env.getConfig().setGlobalJobParameters(config);
-
-			return env.addSource(
-					new FiniteStormSpoutWrapper<String>(new FiniteStormFileSpout(),
-							new String[] { Utils.DEFAULT_STREAM_ID }),
-							TypeExtractor.getForClass(String.class)).setParallelism(1);
-		}
-
-		return env.addSource(
-				new FiniteStormSpoutWrapper<String>(new FiniteStormInMemorySpout(
-						WordCountData.WORDS), new String[] { Utils.DEFAULT_STREAM_ID }),
-						TypeExtractor.getForClass(String.class)).setParallelism(1);
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
deleted file mode 100644
index c87b3a5..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
-
-/**
- * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
- * files in a streaming fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}
- * and submitted to Flink for execution in the same way as to a Storm {@link backtype.storm.LocalCluster}.
- * <p/>
- * This example shows how to run program directly within Java, thus it cannot be used to submit a
- * {@link backtype.storm.generated.StormTopology} via Flink command line clients (ie, bin/flink).
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>StormExclamationLocal &lt;text path&gt; &lt;result path&gt;</code><br/>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>run a regular Storm program locally on Flink</li>
- * </ul>
- */
-public class StormExclamationLocal {
-
-	public final static String topologyId = "Streaming Exclamation";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!ExclamationTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology();
-
-		// execute program locally
-		Config conf = new Config();
-		conf.put(ExclamationBolt.EXCLAMATION_COUNT, ExclamationTopology.getExclamation());
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		cluster.submitTopology(topologyId, conf, builder.createTopology());
-
-		Utils.sleep(10 * 1000);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
deleted file mode 100644
index 0f64301..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import backtype.storm.Config;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.utils.Utils;
-import org.apache.flink.stormcompatibility.api.FlinkClient;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-/**
- * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
- * files in a streaming fashion. The program is constructed as a regular {@link StormTopology} and
- * submitted to Flink for execution in the same way as to a Storm cluster similar to
- * {@link NimbusClient}. The Flink cluster can be local or remote.
- * <p/>
- * This example shows how to submit the program via Java, thus it cannot be used to submit a
- * {@link StormTopology} via Flink command line clients (ie, bin/flink).
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>StormExclamationRemoteByClient &lt;text path&gt; &lt;result path&gt;</code><br/>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
- * </ul>
- */
-public class StormExclamationRemoteByClient {
-
-	public final static String topologyId = "Streaming Exclamation";
-	private final static String uploadedJarLocation = "target/flink-storm-examples-0.9-SNAPSHOT-ExclamationStorm.jar";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws AlreadyAliveException, InvalidTopologyException,
-			NotAliveException {
-
-		if (!ExclamationTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology();
-
-		// execute program on Flink cluster
-		final Config conf = new Config();
-		// can be changed to remote address
-		conf.put(Config.NIMBUS_HOST, "localhost");
-		// use default flink jobmanger.rpc.port
-		conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
-
-		final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);
-		cluster.submitTopology(topologyId, uploadedJarLocation, builder.createTopology());
-
-		Utils.sleep(5 * 1000);
-
-		cluster.killTopology(topologyId);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
deleted file mode 100644
index d580520..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import backtype.storm.Config;
-import org.apache.flink.stormcompatibility.api.FlinkClient;
-import org.apache.flink.stormcompatibility.api.FlinkSubmitter;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-/**
- * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
- * files in a streaming fashion. The program is constructed as a regular {@link StormTopology} and
- * submitted to Flink for execution in the same way as to a Storm cluster similar to
- * {@link StormSubmitter}. The Flink cluster can be local or remote.
- * <p/>
- * This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink).
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>StormExclamationRemoteByClient &lt;text path&gt; &lt;result path&gt;</code><br/>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
- * </ul>
- */
-public class StormExclamationRemoteBySubmitter {
-
-	public final static String topologyId = "Streaming Exclamation";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!ExclamationTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology();
-
-		// execute program on Flink cluster
-		final Config conf = new Config();
-		// We can set Jobmanager host/port values manually or leave them blank
-		// if not set and
-		// - executed within Java, default values "localhost" and "6123" are set by FlinkSubmitter
-		// - executed via bin/flink values from flink-conf.yaml are set by FlinkSubmitter.
-		// conf.put(Config.NIMBUS_HOST, "localhost");
-		// conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123));
-
-		// The user jar file must be specified via JVM argument if executed via Java.
-		// => -Dstorm.jar=target/flink-storm-examples-0.9-SNAPSHOT-WordCountStorm.jar
-		// If bin/flink is used, the jar file is detected automatically.
-		FlinkSubmitter.submitTopology(topologyId, conf, builder.createTopology());
-
-		Thread.sleep(5 * 1000);
-
-		FlinkClient.getConfiguredClient(conf).killTopology(topologyId);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
deleted file mode 100644
index 2709eff..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation.stormoperators;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.Map;
-
-public class ExclamationBolt implements IRichBolt {
-	private final static long serialVersionUID = -6364882114201311380L;
-
-	public final static String EXCLAMATION_COUNT = "exclamation.count";
-
-	private OutputCollector collector;
-	private String exclamation;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-		this.collector = collector;
-
-		Object count = conf.get(EXCLAMATION_COUNT);
-		if (count != null) {
-			int exclamationNum = (Integer) count;
-			StringBuilder builder = new StringBuilder();
-			for (int index = 0; index < exclamationNum; ++index) {
-				builder.append('!');
-			}
-			this.exclamation = builder.toString();
-		} else {
-			this.exclamation = "!";
-		}
-	}
-
-	@Override
-	public void cleanup() {
-	}
-
-	@Override
-	public void execute(Tuple tuple) {
-		collector.emit(tuple, new Values(tuple.getString(0) + this.exclamation));
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("word"));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
deleted file mode 100644
index 79c7125..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.singlejoin;
-
-import backtype.storm.tuple.Fields;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.stormcompatibility.singlejoin.stormoperators.AgeSpout;
-import org.apache.flink.stormcompatibility.singlejoin.stormoperators.GenderSpout;
-import org.apache.flink.stormcompatibility.singlejoin.stormoperators.SingleJoinBolt;
-import org.apache.flink.stormcompatibility.util.OutputFormatter;
-import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
-import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
-import org.apache.flink.stormcompatibility.util.TupleOutputFormatter;
-
-public class SingleJoinTopology {
-
-	public final static String spoutId1 = "gender";
-	public final static String spoutId2 = "age";
-	public final static String boltId = "singleJoin";
-	public final static String sinkId = "sink";
-	private final static OutputFormatter formatter = new TupleOutputFormatter();
-
-	public static FlinkTopologyBuilder buildTopology() {
-
-		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
-
-		// get input data
-		builder.setSpout(spoutId1, new GenderSpout(new Fields("id", "gender")));
-		builder.setSpout(spoutId2, new AgeSpout(new Fields("id", "age")));
-
-		builder.setBolt(boltId, new SingleJoinBolt(new Fields("gender", "age")))
-		.fieldsGrouping(spoutId1, new Fields("id"))
-		.fieldsGrouping(spoutId2, new Fields("id"));
-		//.shuffleGrouping(spoutId1)
-		//.shuffleGrouping(spoutId2);
-
-		// emit result
-		if (fileInputOutput) {
-			// read the text file from given input path
-			final String[] tokens = outputPath.split(":");
-			final String outputFile = tokens[tokens.length - 1];
-			builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter)).shuffleGrouping(boltId);
-		} else {
-			builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4).shuffleGrouping(boltId);
-		}
-
-		return builder;
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileInputOutput = false;
-	private static String outputPath;
-
-	static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileInputOutput = true;
-			if (args.length == 1) {
-				outputPath = args[0];
-			} else {
-				System.err.println("Usage: StormSingleJoin* <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing StormSingleJoin* example with built-in default data");
-			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: StormSingleJoin* <result path>");
-		}
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
deleted file mode 100644
index d70914a..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.singlejoin;
-
-import backtype.storm.utils.Utils;
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-public class StormSingleJoinLocal {
-	public final static String topologyId = "Streaming SingleJoin";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!SingleJoinTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final FlinkTopologyBuilder builder = SingleJoinTopology.buildTopology();
-
-		// execute program locally
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		cluster.submitTopology(topologyId, null, builder.createTopology());
-
-		Utils.sleep(5 * 1000);
-
-		// TODO kill does no do anything so far
-		cluster.killTopology(topologyId);
-		cluster.shutdown();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
deleted file mode 100644
index 49761c3..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import org.apache.flink.stormcompatibility.util.AbstractStormSpout;
-
-public class AgeSpout extends AbstractStormSpout {
-	private static final long serialVersionUID = -4008858647468647019L;
-
-	private int counter = 0;
-	private String gender;
-	private Fields outFields;
-
-	public AgeSpout(Fields outFields) {
-		this.outFields = outFields;
-	}
-
-	@Override
-	public void nextTuple() {
-		if (this.counter < 10) {
-			if (counter % 2 == 0) {
-				gender = "male";
-			} else {
-				gender = "female";
-			}
-			this.collector.emit(new Values(counter, gender));
-			counter++;
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(outFields);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
deleted file mode 100644
index 238b6db..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import org.apache.flink.stormcompatibility.util.AbstractStormSpout;
-
-public class GenderSpout extends AbstractStormSpout {
-	private static final long serialVersionUID = -5079110197950743927L;
-
-	private int counter = 9;
-	private Fields outFields;
-
-	public GenderSpout(Fields outFields) {
-		this.outFields = outFields;
-	}
-
-	@Override
-	public void nextTuple() {
-		if (counter >= 0) {
-			this.collector.emit(new Values(counter, counter + 20));
-			counter--;
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(outFields);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
deleted file mode 100644
index cd53140..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.TimeCacheMap;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-@SuppressWarnings("deprecation")
-public class SingleJoinBolt implements IRichBolt {
-	OutputCollector collector;
-	Fields idFields;
-	Fields outFields;
-	int numSources = 2;
-	TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> pending;
-	Map<String, GlobalStreamId> fieldLocations;
-
-	public SingleJoinBolt(Fields outFields) {
-		this.outFields = outFields;
-	}
-
-	@SuppressWarnings({"rawtypes", "null"})
-	@Override
-	public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-		fieldLocations = new HashMap<String, GlobalStreamId>();
-		this.collector = collector;
-		int timeout = 100;
-		pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
-		// numSources = context.getThisSources().size();
-		Set<String> idFields = null;
-		for (GlobalStreamId source : context.getThisSources().keySet()) {
-			Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
-			Set<String> setFields = new HashSet<String>(fields.toList());
-			if (idFields == null) {
-				idFields = setFields;
-			} else {
-				idFields.retainAll(setFields);
-			}
-
-			for (String outfield : outFields) {
-				for (String sourcefield : fields) {
-					if (outfield.equals(sourcefield)) {
-						fieldLocations.put(outfield, source);
-					}
-				}
-			}
-		}
-		this.idFields = new Fields(new ArrayList<String>(idFields));
-
-		if (fieldLocations.size() != outFields.size()) {
-			throw new RuntimeException("Cannot find all outfields among sources");
-		}
-	}
-
-	@Override
-	public void execute(Tuple tuple) {
-		List<Object> id = tuple.select(idFields);
-		GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
-		if (!pending.containsKey(id)) {
-			pending.put(id, new HashMap<GlobalStreamId, Tuple>());
-		}
-		Map<GlobalStreamId, Tuple> parts = pending.get(id);
-		if (parts.containsKey(streamId)) {
-			throw new RuntimeException("Received same side of single join twice");
-		}
-		parts.put(streamId, tuple);
-		if (parts.size() == numSources) {
-			pending.remove(id);
-			List<Object> joinResult = new ArrayList<Object>();
-			for (String outField : outFields) {
-				GlobalStreamId loc = fieldLocations.get(outField);
-				joinResult.add(parts.get(loc).getValueByField(outField));
-			}
-			collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);
-
-			for (Tuple part : parts.values()) {
-				collector.ack(part);
-			}
-		}
-	}
-
-	@Override
-	public void cleanup() {
-		/* nothing to do */
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(outFields);
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-	private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
-		@Override
-		public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
-			for (Tuple tuple : tuples.values()) {
-				collector.fail(tuple);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
deleted file mode 100644
index 18251d4..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.split;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.stormcompatibility.split.stormoperators.RandomSpout;
-import org.apache.flink.stormcompatibility.split.stormoperators.VerifyAndEnrichBolt;
-import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
-import org.apache.flink.stormcompatibility.util.SplitStreamMapper;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
-import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * Implements a simple example with two declared output streams for the embedded Spout.
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>handle multiple output stream of a spout</li>
- * <li>accessing each stream by .split(...) and .select(...)</li>
- * <li>strip wrapper data type SplitStreamType for furhter processing in Flink</li>
- * </ul>
- * <p/>
- * This example would work the same way for multiple bolt output streams.
- */
-public class SpoutSplitExample {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		String[] rawOutputs = new String[] { RandomSpout.EVEN_STREAM, RandomSpout.ODD_STREAM };
-
-		final DataStream<SplitStreamType<Integer>> numbers = env.addSource(
-				new StormSpoutWrapper<SplitStreamType<Integer>>(new RandomSpout(true, 0),
-						rawOutputs), TypeExtractor.getForObject(new SplitStreamType<Integer>()));
-
-		SplitStream<SplitStreamType<Integer>> splitStream = numbers
-				.split(new FlinkStormStreamSelector<Integer>());
-
-		DataStream<SplitStreamType<Integer>> evenStream = splitStream.select(RandomSpout.EVEN_STREAM);
-		DataStream<SplitStreamType<Integer>> oddStream = splitStream.select(RandomSpout.ODD_STREAM);
-
-		evenStream.map(new SplitStreamMapper<Integer>()).returns(Integer.class).map(new Enrich("even")).print();
-		oddStream.transform("oddBolt",
-				TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
-				new StormBoltWrapper<SplitStreamType<Integer>, Tuple2<String, Integer>>(
-						new VerifyAndEnrichBolt(false)))
-						.print();
-
-		// execute program
-		env.execute("Spout split stream example");
-	}
-
-	// *************************************************************************
-	//     USER FUNCTIONS
-	// *************************************************************************
-
-	/**
-	 * Same as {@link VerifyAndEnrichBolt}.
-	 */
-	private final static class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 5213888269197438892L;
-		private final Tuple2<String, Integer> out;
-
-		public Enrich(String token) {
-			this.out = new Tuple2<String, Integer>(token, 0);
-		}
-
-		@Override
-		public Tuple2<String, Integer> map(Integer value) throws Exception {
-			this.out.setField(value, 1);
-			return this.out;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java
deleted file mode 100644
index 75d710e..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.split.stormoperators;
-
-import java.util.Map;
-import java.util.Random;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-public class RandomSpout extends BaseRichSpout {
-	private static final long serialVersionUID = -3978554318742509334L;
-
-	public static final String EVEN_STREAM = "even";
-	public static final String ODD_STREAM = "odd";
-
-	private final boolean split;
-	private Random r = new Random();
-	private SpoutOutputCollector collector;
-
-	public RandomSpout(boolean split, long seed) {
-		this.split = split;
-		this.r = new Random(seed);
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-		this.collector = collector;
-	}
-
-	@Override
-	public void nextTuple() {
-		int i = r.nextInt();
-		if (split) {
-			if (i % 2 == 0) {
-				this.collector.emit(EVEN_STREAM, new Values(i));
-			} else {
-				this.collector.emit(ODD_STREAM, new Values(i));
-			}
-		} else {
-			this.collector.emit(new Values(i));
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		Fields schema = new Fields("number");
-		if (split) {
-			declarer.declareStream(EVEN_STREAM, schema);
-			declarer.declareStream(ODD_STREAM, schema);
-		} else {
-			declarer.declare(schema);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/VerifyAndEnrichBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/VerifyAndEnrichBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/VerifyAndEnrichBolt.java
deleted file mode 100644
index 5853705..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/VerifyAndEnrichBolt.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.split.stormoperators;
-
-import java.util.Map;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-public class VerifyAndEnrichBolt extends BaseRichBolt {
-	private static final long serialVersionUID = -7277395570966328721L;
-
-	private final boolean evenOrOdd; // true: even -- false: odd
-	private final String token;
-	private OutputCollector collector;
-
-	public VerifyAndEnrichBolt(boolean evenOrOdd) {
-		this.evenOrOdd = evenOrOdd;
-		this.token = evenOrOdd ? "even" : "odd";
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-		this.collector = collector;
-	}
-
-	@Override
-	public void execute(Tuple input) {
-		if ((input.getInteger(0) % 2 == 0) != this.evenOrOdd) {
-			throw new RuntimeException("Invalid number detected.");
-		}
-		this.collector.emit(new Values(this.token, input.getInteger(0)));
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("evenOrOdd", "number"));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
deleted file mode 100644
index b121744..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-
-import java.util.Map;
-
-/**
- * Implements a sink that write the received data so some external output. The result is formatted like
- * {@code (a1, a2, ..., an)} with {@code Object.toString()} for each attribute).
- */
-public abstract class AbstractStormBoltSink implements IRichBolt {
-	private static final long serialVersionUID = -1626323806848080430L;
-
-	private StringBuilder lineBuilder;
-	private String prefix = "";
-	private final OutputFormatter formatter;
-
-	public AbstractStormBoltSink(final OutputFormatter formatter) {
-		this.formatter = formatter;
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public final void prepare(final Map stormConf, final TopologyContext context,
-			final OutputCollector collector) {
-		this.prepareSimple(stormConf, context);
-		if (context.getComponentCommon(context.getThisComponentId()).get_parallelism_hint() > 1) {
-			this.prefix = context.getThisTaskId() + "> ";
-		}
-	}
-
-	protected abstract void prepareSimple(final Map<?, ?> stormConf, final TopologyContext context);
-
-	@Override
-	public final void execute(final Tuple input) {
-		this.lineBuilder = new StringBuilder();
-		this.lineBuilder.append(this.prefix);
-		this.lineBuilder.append(this.formatter.format(input));
-		this.writeExternal(this.lineBuilder.toString());
-	}
-
-	protected abstract void writeExternal(final String line);
-
-	@Override
-	public void cleanup() {/* nothing to do */}
-
-	@Override
-	public final void declareOutputFields(final OutputFieldsDeclarer declarer) {/* nothing to do */}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormSpout.java
deleted file mode 100644
index 4739a2c..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormSpout.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-
-import java.util.Map;
-
-/**
- * Base class for Storm Spout that reads data line by line from an arbitrary source. The declared output schema has a
- * single attribute called {@code line} and should be of type {@link String}.
- */
-public abstract class AbstractStormSpout implements IRichSpout {
-	private static final long serialVersionUID = 8876828403487806771L;
-
-	public final static String ATTRIBUTE_LINE = "line";
-
-	protected SpoutOutputCollector collector;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
-		this.collector = collector;
-	}
-
-	@Override
-	public void close() {/* noting to do */}
-
-	@Override
-	public void activate() {/* noting to do */}
-
-	@Override
-	public void deactivate() {/* noting to do */}
-
-	@Override
-	public void ack(final Object msgId) {/* noting to do */}
-
-	@Override
-	public void fail(final Object msgId) {/* noting to do */}
-
-	@Override
-	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields(ATTRIBUTE_LINE));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
deleted file mode 100644
index d3776fb..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Values;
-
-import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Implements a Storm Spout that reads data from a given local file. The spout stops automatically
- * when it reached the end of the file.
- */
-public class FiniteStormFileSpout extends StormFileSpout implements FiniteStormSpout {
-	private static final long serialVersionUID = -1472978008607215864L;
-
-	private String line;
-	private boolean newLineRead;
-
-	public FiniteStormFileSpout() {}
-
-	public FiniteStormFileSpout(String path) {
-		super(path);
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
-		super.open(conf, context, collector);
-		newLineRead = false;
-	}
-
-	@Override
-	public void nextTuple() {
-		this.collector.emit(new Values(line));
-		newLineRead = false;
-	}
-
-	/**
-	 * Can be called before nextTuple() any times including 0.
-	 */
-	@Override
-	public boolean reachedEnd() {
-		try {
-			readLine();
-		} catch (IOException e) {
-			throw new RuntimeException("Exception occured while reading file " + path);
-		}
-		return line == null;
-	}
-
-	private void readLine() throws IOException {
-		if (!newLineRead) {
-			line = reader.readLine();
-			newLineRead = true;
-		}
-	}
-
-}