You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/15 11:33:03 UTC

[13/27] flink git commit: [storm-compat] Simple examples added

[storm-compat] Simple examples added


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09e5be41
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09e5be41
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09e5be41

Branch: refs/heads/master
Commit: 09e5be41b3bcbfe7aeb23a92b3ebf62f9a2f6968
Parents: e497a83
Author: szape <ne...@gmail.com>
Authored: Thu May 21 10:16:58 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sun Jun 14 23:00:10 2015 +0200

----------------------------------------------------------------------
 .../excamation/ExclamationTopology.java         |  99 ++++++++++++++++
 .../excamation/StormBoltExclamation.java        | 113 ++++++++++++++++++
 .../excamation/StormExclamationLocal.java       |  52 ++++++++
 .../StormExclamationRemoteByClient.java         |  61 ++++++++++
 .../StormExclamationRemoteBySubmitter.java      |  60 ++++++++++
 .../excamation/StormSpoutExclamation.java       | 118 +++++++++++++++++++
 .../stormoperators/ExclamationBolt.java         |  58 +++++++++
 .../singlejoin/SingleJoinTopology.java          |  73 ++++++++++++
 .../singlejoin/StormSingleJoinLocal.java        |  34 ++++++
 .../singlejoin/stormoperators/AgeSpout.java     |  37 ++++++
 .../singlejoin/stormoperators/GenderSpout.java  |  28 +++++
 .../stormoperators/SingleJoinBolt.java          | 116 ++++++++++++++++++
 .../util/SimpleOutputFormatter.java             |  27 +++++
 .../stormcompatibility/util/StormFileSpout.java |  78 ++++++++++++
 .../util/StormInMemorySpout.java                |  43 +++++++
 .../util/TupleOutputFormatter.java              |  40 +++++++
 .../wordcount/BoltTokenizerWordCount.java       |   6 +-
 .../wordcount/SpoutSourceWordCount.java         |  12 +-
 .../wordcount/StormWordCountLocal.java          |   6 +-
 .../wordcount/WordCountTopology.java            |  16 +--
 .../stormoperators/StormBoltCounter.java        |   3 +-
 .../stormoperators/StormFileSpout.java          |  79 -------------
 .../stormoperators/StormInMemorySpout.java      |  39 ------
 .../WordCountOutputFormatter.java               |  41 -------
 .../exclamation/StormBoltExclamationITCase.java |  47 ++++++++
 .../StormExclamationLocalITCase.java            |  49 ++++++++
 .../StormSpoutExclamationITCase.java            |  47 ++++++++
 .../exclamation/util/ExclamationData.java       |  98 +++++++++++++++
 28 files changed, 1300 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
new file mode 100644
index 0000000..dc40d15
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.excamation;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
+import org.apache.flink.stormcompatibility.util.OutputFormatter;
+import org.apache.flink.stormcompatibility.util.SimpleOutputFormatter;
+import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
+import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
+import org.apache.flink.stormcompatibility.util.StormFileSpout;
+import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
+
+/**
+ * This is a basic example of a Storm topology.
+ */
+public class ExclamationTopology {
+
+	public final static String spoutId = "source";
+	public final static String firstBoltId = "exlamation1";
+	public final static String secondBoltId = "exclamation2";
+	public final static String sinkId = "sink";
+	private final static OutputFormatter formatter = new SimpleOutputFormatter();
+
+	public static FlinkTopologyBuilder buildTopology() {
+		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+
+		// get input data
+		if (fileInputOutput) {
+			// read the text file from given input path
+			final String[] tokens = textPath.split(":");
+			final String inputFile = tokens[tokens.length - 1];
+			builder.setSpout(spoutId, new StormFileSpout(inputFile));
+		} else {
+			builder.setSpout(spoutId, new StormInMemorySpout(WordCountData.WORDS));
+		}
+
+		builder.setBolt(firstBoltId, new ExclamationBolt(), 3).shuffleGrouping(spoutId);
+		builder.setBolt(secondBoltId, new ExclamationBolt(), 2).shuffleGrouping(firstBoltId);
+
+		// emit result
+		if (fileInputOutput) {
+			// read the text file from given input path
+			final String[] tokens = outputPath.split(":");
+			final String outputFile = tokens[tokens.length - 1];
+			builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter)).shuffleGrouping(secondBoltId);
+		} else {
+			builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4).shuffleGrouping(secondBoltId);
+		}
+
+		return builder;
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileInputOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileInputOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: StormExclamation* <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing StormExclamation* example with built-in default data");
+			System.out.println("  Provide parameters to read input data from a file");
+			System.out.println("  Usage: StormExclamation* <text path> <result path>");
+		}
+
+		return true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormBoltExclamation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormBoltExclamation.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormBoltExclamation.java
new file mode 100644
index 0000000..52e740c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormBoltExclamation.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.excamation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
+import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+public class StormBoltExclamation {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		final DataStream<String> text = getTextDataStream(env);
+
+		final DataStream<String> exclaimed = text
+				.transform("StormBoltTokenizer",
+						TypeExtractor.getForObject(""),
+						new StormBoltWrapper<String, String>(new ExclamationBolt(), true))
+				.map(new ExclamationMap());
+
+		// emit result
+		if (fileOutput) {
+			exclaimed.writeAsText(outputPath);
+		} else {
+			exclaimed.print();
+		}
+
+		// execute program
+		env.execute("Streaming WordCount with Storm bolt tokenizer");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private static class ExclamationMap implements MapFunction<String, String> {
+
+		@Override
+		public String map(String value) throws Exception {
+			return value + "!!!";
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: StormBoltExclamation <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing StormBoltExclamation example with built-in default data");
+			System.out.println("  Provide parameters to read input data from a file");
+			System.out.println("  Usage: StormBoltExclamation <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		}
+
+		return env.fromElements(WordCountData.WORDS);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
new file mode 100644
index 0000000..c87fe8f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.excamation;
+
+import backtype.storm.utils.Utils;
+import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+
+public class StormExclamationLocal {
+
+	public final static String topologyId = "Streaming Exclamation";
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!ExclamationTopology.parseParameters(args)) {
+			return;
+		}
+
+		// build Topology the Storm way
+		final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology();
+
+		// execute program locally
+		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
+		cluster.submitTopology(topologyId, null, builder.createTopology());
+
+		Utils.sleep(5 * 1000);
+
+		// TODO kill does no do anything so far
+		cluster.killTopology(topologyId);
+		cluster.shutdown();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
new file mode 100644
index 0000000..3f55316
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.excamation;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.NotAliveException;
+import backtype.storm.utils.Utils;
+import org.apache.flink.stormcompatibility.api.FlinkClient;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+
+public class StormExclamationRemoteByClient {
+
+	public final static String topologyId = "Streaming Exclamation";
+	private final static String uploadedJarLocation = "target/flink-storm-examples-0.9-SNAPSHOT-ExclamationStorm.jar";
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws AlreadyAliveException, InvalidTopologyException,
+			NotAliveException {
+
+		if (!ExclamationTopology.parseParameters(args)) {
+			return;
+		}
+
+		// build Topology the Storm way
+		final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology();
+
+		// execute program on Flink cluster
+		final Config conf = new Config();
+		// can be changed to remote address
+		conf.put(Config.NIMBUS_HOST, "localhost");
+		// use default flink jobmanger.rpc.port
+		conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
+
+		final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);
+		cluster.submitTopology(topologyId, uploadedJarLocation, builder.createTopology());
+
+		Utils.sleep(5 * 1000);
+
+		cluster.killTopology(topologyId);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
new file mode 100644
index 0000000..728c5c7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.excamation;
+
+import backtype.storm.Config;
+import org.apache.flink.stormcompatibility.api.FlinkClient;
+import org.apache.flink.stormcompatibility.api.FlinkSubmitter;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+
+public class StormExclamationRemoteBySubmitter {
+
+	public final static String topologyId = "Streaming Exclamation";
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!ExclamationTopology.parseParameters(args)) {
+			return;
+		}
+
+		// build Topology the Storm way
+		final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology();
+
+		// execute program on Flink cluster
+		final Config conf = new Config();
+		// We can set Jobmanager host/port values manually or leave them blank
+		// if not set and
+		// - executed within Java, default values "localhost" and "6123" are set by FlinkSubmitter
+		// - executed via bin/flink values from flink-conf.yaml are set by FlinkSubmitter.
+		// conf.put(Config.NIMBUS_HOST, "localhost");
+		// conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123));
+
+		// The user jar file must be specified via JVM argument if executed via Java.
+		// => -Dstorm.jar=target/flink-storm-examples-0.9-SNAPSHOT-WordCountStorm.jar
+		// If bin/flink is used, the jar file is detected automatically.
+		FlinkSubmitter.submitTopology(topologyId, conf, builder.createTopology());
+
+		Thread.sleep(5 * 1000);
+
+		FlinkClient.getConfiguredClient(conf).killTopology(topologyId);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormSpoutExclamation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormSpoutExclamation.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormSpoutExclamation.java
new file mode 100644
index 0000000..2569fa7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormSpoutExclamation.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.excamation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.util.StormFileSpout;
+import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
+import org.apache.flink.stormcompatibility.wrappers.StormFiniteSpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+public class StormSpoutExclamation {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		final DataStream<String> text = getTextDataStream(env);
+
+		final DataStream<String> exclaimed = text
+				.map(new ExclamationMap())
+				.map(new ExclamationMap());
+
+		// emit result
+		if (fileOutput) {
+			exclaimed.writeAsText(outputPath);
+		} else {
+			exclaimed.print();
+		}
+
+		// execute program
+		env.execute("Streaming Exclamation with Storm spout source");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private static class ExclamationMap implements MapFunction<String, String> {
+
+		@Override
+		public String map(String value) throws Exception {
+			return value + "!!!";
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: StormSpoutExclamation <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing StormSpoutExclamation example with built-in default data");
+			System.out.println("  Provide parameters to read input data from a file");
+			System.out.println("  Usage: StormSpoutExclamation <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			final String[] tokens = textPath.split(":");
+			final String localFile = tokens[tokens.length - 1];
+			return env.addSource(
+					new StormFiniteSpoutWrapper<String>(new StormFileSpout(localFile), true),
+					TypeExtractor.getForClass(String.class)).setParallelism(1);
+		}
+
+		return env.addSource(new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(WordCountData.WORDS), true),
+				TypeExtractor.getForClass(String.class));
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
new file mode 100644
index 0000000..14232b7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.excamation.stormoperators;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+public class ExclamationBolt implements IRichBolt {
+	OutputCollector _collector;
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+		_collector = collector;
+	}
+
+	@Override
+	public void cleanup() {
+	}
+
+	@Override
+	public void execute(Tuple tuple) {
+		_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("word"));
+	}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
new file mode 100644
index 0000000..cae50b7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
@@ -0,0 +1,73 @@
+package org.apache.flink.stormcompatibility.singlejoin;
+
+import backtype.storm.tuple.Fields;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+import org.apache.flink.stormcompatibility.singlejoin.stormoperators.AgeSpout;
+import org.apache.flink.stormcompatibility.singlejoin.stormoperators.GenderSpout;
+import org.apache.flink.stormcompatibility.singlejoin.stormoperators.SingleJoinBolt;
+import org.apache.flink.stormcompatibility.util.OutputFormatter;
+import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
+import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
+import org.apache.flink.stormcompatibility.util.TupleOutputFormatter;
+
+public class SingleJoinTopology {
+
+	public final static String spoutId1 = "gender";
+	public final static String spoutId2 = "age";
+	public final static String boltId = "singleJoin";
+	public final static String sinkId = "sink";
+	private final static OutputFormatter formatter = new TupleOutputFormatter();
+
+	public static FlinkTopologyBuilder buildTopology() {
+
+		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+
+		// get input data
+		builder.setSpout(spoutId1, new GenderSpout(new Fields("id", "gender")));
+		builder.setSpout(spoutId2, new AgeSpout(new Fields("id", "age")));
+
+		builder.setBolt(boltId, new SingleJoinBolt(new Fields("gender", "age")))
+				.fieldsGrouping(spoutId1, new Fields("id"))
+				.fieldsGrouping(spoutId2, new Fields("id"));
+				//.shuffleGrouping(spoutId1)
+				//.shuffleGrouping(spoutId2);
+
+		// emit result
+		if (fileInputOutput) {
+			// read the text file from given input path
+			final String[] tokens = outputPath.split(":");
+			final String outputFile = tokens[tokens.length - 1];
+			builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter)).shuffleGrouping(boltId);
+		} else {
+			builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4).shuffleGrouping(boltId);
+		}
+
+		return builder;
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileInputOutput = false;
+	private static String outputPath;
+
+	static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileInputOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: StormSingleJoin* <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing StormSingleJoin* example with built-in default data");
+			System.out.println("  Provide parameters to read input data from a file");
+			System.out.println("  Usage: StormSingleJoin* <result path>");
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
new file mode 100644
index 0000000..8d66d29
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
@@ -0,0 +1,34 @@
+package org.apache.flink.stormcompatibility.singlejoin;
+
+import backtype.storm.utils.Utils;
+import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+import org.apache.flink.stormcompatibility.wordcount.WordCountTopology;
+
+public class StormSingleJoinLocal {
+	public final static String topologyId = "Streaming SingleJoin";
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!SingleJoinTopology.parseParameters(args)) {
+			return;
+		}
+
+		// build Topology the Storm way
+		final FlinkTopologyBuilder builder = SingleJoinTopology.buildTopology();
+
+		// execute program locally
+		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
+		cluster.submitTopology(topologyId, null, builder.createTopology());
+
+		Utils.sleep(5 * 1000);
+
+		// TODO kill does no do anything so far
+		cluster.killTopology(topologyId);
+		cluster.shutdown();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
new file mode 100644
index 0000000..4edc133
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
@@ -0,0 +1,37 @@
+package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.flink.stormcompatibility.util.AbstractStormSpout;
+
+public class AgeSpout extends AbstractStormSpout {
+	private static final long serialVersionUID = -4008858647468647019L;
+
+	private int counter = 0;
+	private String gender;
+	private Fields outFields;
+
+	public AgeSpout(Fields outFields) {
+		this.outFields = outFields;
+	}
+
+	@Override
+	public void nextTuple() {
+		if (this.counter < 10) {
+			if (counter % 2 == 0) {
+				gender = "male";
+			} else {
+				gender = "female";
+			}
+			this.collector.emit(new Values(counter, gender));
+			counter++;
+		}
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declare(outFields);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
new file mode 100644
index 0000000..ac25917
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
@@ -0,0 +1,28 @@
+package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.flink.stormcompatibility.util.AbstractStormSpout;
+
+public class GenderSpout extends AbstractStormSpout {
+	private int counter = 9;
+	private Fields outFields;
+
+	public GenderSpout(Fields outFields) {
+		this.outFields = outFields;
+	}
+
+	@Override
+	public void nextTuple() {
+		if (counter >= 0) {
+			this.collector.emit(new Values(counter, counter + 20));
+			counter--;
+		}
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declare(outFields);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
new file mode 100644
index 0000000..3df1618
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
@@ -0,0 +1,116 @@
+package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
+
+import backtype.storm.Config;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TimeCacheMap;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@SuppressWarnings("deprecation")
+public class SingleJoinBolt implements IRichBolt {
+	OutputCollector collector;
+	Fields idFields;
+	Fields outFields;
+	int numSources = 2;
+	TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> pending;
+	Map<String, GlobalStreamId> fieldLocations;
+
+	public SingleJoinBolt(Fields outFields) {
+		this.outFields = outFields;
+	}
+
+	@SuppressWarnings({"rawtypes", "null"})
+	@Override
+	public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+		fieldLocations = new HashMap<String, GlobalStreamId>();
+		this.collector = collector;
+		int timeout = 100;
+		pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
+		// numSources = context.getThisSources().size();
+		Set<String> idFields = null;
+		for (GlobalStreamId source : context.getThisSources().keySet()) {
+			Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
+			Set<String> setFields = new HashSet<String>(fields.toList());
+			if (idFields == null) {
+				idFields = setFields;
+			} else {
+				idFields.retainAll(setFields);
+			}
+
+			for (String outfield : outFields) {
+				for (String sourcefield : fields) {
+					if (outfield.equals(sourcefield)) {
+						fieldLocations.put(outfield, source);
+					}
+				}
+			}
+		}
+		this.idFields = new Fields(new ArrayList<String>(idFields));
+
+		if (fieldLocations.size() != outFields.size()) {
+			throw new RuntimeException("Cannot find all outfields among sources");
+		}
+	}
+
+	@Override
+	public void execute(Tuple tuple) {
+		List<Object> id = tuple.select(idFields);
+		GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
+		if (!pending.containsKey(id)) {
+			pending.put(id, new HashMap<GlobalStreamId, Tuple>());
+		}
+		Map<GlobalStreamId, Tuple> parts = pending.get(id);
+		if (parts.containsKey(streamId)) {
+			throw new RuntimeException("Received same side of single join twice");
+		}
+		parts.put(streamId, tuple);
+		if (parts.size() == numSources) {
+			pending.remove(id);
+			List<Object> joinResult = new ArrayList<Object>();
+			for (String outField : outFields) {
+				GlobalStreamId loc = fieldLocations.get(outField);
+				joinResult.add(parts.get(loc).getValueByField(outField));
+			}
+			collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);
+
+			for (Tuple part : parts.values()) {
+				collector.ack(part);
+			}
+		}
+	}
+
+	@Override
+	public void cleanup() {
+		/* nothing to do */
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declare(outFields);
+	}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+	private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
+		@Override
+		public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
+			for (Tuple tuple : tuples.values()) {
+				collector.fail(tuple);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
new file mode 100644
index 0000000..fca5fcc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+public class SimpleOutputFormatter implements OutputFormatter {
+
+	@Override
+	public String format(Object input) {
+		return input.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
new file mode 100644
index 0000000..c38b599
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Values;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Implements a Storm Spout that reads data from a given local file.
+ */
+public final class StormFileSpout extends AbstractStormSpout {
+	private static final long serialVersionUID = -6996907090003590436L;
+
+	private final String path;
+	private BufferedReader reader;
+
+	public StormFileSpout(final String path) {
+		this.path = path;
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+		super.open(conf, context, collector);
+		try {
+			this.reader = new BufferedReader(new FileReader(this.path));
+		} catch (final FileNotFoundException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public void close() {
+		if (this.reader != null) {
+			try {
+				this.reader.close();
+			} catch (final IOException e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
+
+	@Override
+	public void nextTuple() {
+		String line;
+		try {
+			line = this.reader.readLine();
+			if (line != null) {
+				this.collector.emit(new Values(line));
+			}
+		} catch (final IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
new file mode 100644
index 0000000..3e6081c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.tuple.Values;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+
+/**
+ * Implements a Storm Spout that reads data from {@link WordCountData#WORDS}.
+ */
+public final class StormInMemorySpout extends AbstractStormSpout {
+	private static final long serialVersionUID = -4008858647468647019L;
+
+	private String[] source;
+	private int counter = 0;
+
+	public StormInMemorySpout(String[] source) {
+		this.source = source;
+	}
+
+	@Override
+	public void nextTuple() {
+		if (this.counter < WordCountData.WORDS.length) {
+			this.collector.emit(new Values(WordCountData.WORDS[this.counter++]));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
new file mode 100644
index 0000000..f22fab6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+public class TupleOutputFormatter implements OutputFormatter, Serializable {
+
+	@Override
+	public String format(Object input) {
+		Tuple inputTuple = (Tuple) input;
+		StringBuilder stringBuilder = new StringBuilder();
+		stringBuilder.append("(");
+		for (final Object attribute : inputTuple.getValues()) {
+			stringBuilder.append(attribute);
+			stringBuilder.append(",");
+		}
+		stringBuilder.replace(stringBuilder.length() - 1, stringBuilder.length(), ")");
+		return stringBuilder.toString();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
index ab8d027..606a3ce 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
@@ -99,13 +99,13 @@ public class BoltTokenizerWordCount {
 				textPath = args[0];
 				outputPath = args[1];
 			} else {
-				System.err.println("Usage: WordCount <text path> <result path>");
+				System.err.println("Usage: BoltTokenizerWordCount <text path> <result path>");
 				return false;
 			}
 		} else {
-			System.out.println("Executing WordCount example with built-in default data");
+			System.out.println("Executing BoltTokenizerWordCount example with built-in default data");
 			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: WordCount <text path> <result path>");
+			System.out.println("  Usage: BoltTokenizerWordCount <text path> <result path>");
 		}
 		return true;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
index 4178efa..0ae51c6 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormFileSpout;
-import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormInMemorySpout;
+import org.apache.flink.stormcompatibility.util.StormFileSpout;
+import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
 import org.apache.flink.stormcompatibility.wrappers.StormFiniteSpoutWrapper;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -123,13 +123,13 @@ public class SpoutSourceWordCount {
 				textPath = args[0];
 				outputPath = args[1];
 			} else {
-				System.err.println("Usage: WordCount <text path> <result path>");
+				System.err.println("Usage: SpoutSourceWordCount <text path> <result path>");
 				return false;
 			}
 		} else {
-			System.out.println("Executing WordCount example with built-in default data");
+			System.out.println("Executing SpoutSourceWordCount example with built-in default data");
 			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: WordCount <text path> <result path>");
+			System.out.println("  Usage: SpoutSourceWordCount <text path> <result path>");
 		}
 		return true;
 	}
@@ -144,7 +144,7 @@ public class SpoutSourceWordCount {
 					TypeExtractor.getForClass(String.class)).setParallelism(1);
 		}
 
-		return env.addSource(new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(), true),
+		return env.addSource(new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(WordCountData.WORDS), true),
 				TypeExtractor.getForClass(String.class));
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
index 7b4f471..a6b64d2 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
@@ -65,11 +65,11 @@ public class StormWordCountLocal {
 		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
 		cluster.submitTopology(topologyId, null, builder.createTopology());
 
-		Utils.sleep(5 * 1000);
+		Utils.sleep(100 * 1000);
 
 		// TODO kill does no do anything so far
-		cluster.killTopology(topologyId);
-		cluster.shutdown();
+		//cluster.killTopology(topologyId);
+		//cluster.shutdown();
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
index 975b039..d39a526 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
@@ -24,11 +24,11 @@ import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
 import org.apache.flink.stormcompatibility.util.OutputFormatter;
 import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
 import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
+import org.apache.flink.stormcompatibility.util.StormFileSpout;
+import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
+import org.apache.flink.stormcompatibility.util.TupleOutputFormatter;
 import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounter;
 import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer;
-import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormFileSpout;
-import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormInMemorySpout;
-import org.apache.flink.stormcompatibility.wordcount.stormoperators.WordCountOutputFormatter;
 
 /**
  * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
@@ -52,7 +52,7 @@ public class WordCountTopology {
 	public final static String tokenierzerId = "tokenizer";
 	public final static String counterId = "counter";
 	public final static String sinkId = "sink";
-	private final static OutputFormatter formatter = new WordCountOutputFormatter();
+	private final static OutputFormatter formatter = new TupleOutputFormatter();
 
 	public static FlinkTopologyBuilder buildTopology() {
 
@@ -65,7 +65,7 @@ public class WordCountTopology {
 			final String inputFile = tokens[tokens.length - 1];
 			builder.setSpout(spoutId, new StormFileSpout(inputFile));
 		} else {
-			builder.setSpout(spoutId, new StormInMemorySpout());
+			builder.setSpout(spoutId, new StormInMemorySpout(WordCountData.WORDS));
 		}
 
 		// split up the lines in pairs (2-tuples) containing: (word,1)
@@ -104,13 +104,13 @@ public class WordCountTopology {
 				textPath = args[0];
 				outputPath = args[1];
 			} else {
-				System.err.println("Usage: WordCount <text path> <result path>");
+				System.err.println("Usage: StormWordCount* <text path> <result path>");
 				return false;
 			}
 		} else {
-			System.out.println("Executing WordCount example with built-in default data");
+			System.out.println("Executing StormWordCount* example with built-in default data");
 			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: WordCount <text path> <result path>");
+			System.out.println("  Usage: StormWordCount* <text path> <result path>");
 		}
 
 		return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
index f1122ab..214ca5e 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
@@ -76,7 +76,8 @@ public class StormBoltCounter implements IRichBolt {
 	}
 
 	/**
-	 * A counter helper to emit immutable tuples to the given collector and avoid unnecessary object creating/deletion.
+	 * A counter helper to emit immutable tuples to the given stormCollector and avoid unnecessary object
+	 * creating/deletion.
 	 *
 	 * @author mjsax
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormFileSpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormFileSpout.java
deleted file mode 100644
index 7b1daef..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormFileSpout.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount.stormoperators;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Values;
-import org.apache.flink.stormcompatibility.util.AbstractStormSpout;
-
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Implements a Storm Spout that reads data from a given local file.
- */
-public final class StormFileSpout extends AbstractStormSpout {
-	private static final long serialVersionUID = -6996907090003590436L;
-
-	private final String path;
-	private BufferedReader reader;
-
-	public StormFileSpout(final String path) {
-		this.path = path;
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
-		super.open(conf, context, collector);
-		try {
-			this.reader = new BufferedReader(new FileReader(this.path));
-		} catch (final FileNotFoundException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	@Override
-	public void close() {
-		if (this.reader != null) {
-			try {
-				this.reader.close();
-			} catch (final IOException e) {
-				throw new RuntimeException(e);
-			}
-		}
-	}
-
-	@Override
-	public void nextTuple() {
-		String line;
-		try {
-			line = this.reader.readLine();
-			if (line != null) {
-				this.collector.emit(new Values(line));
-			}
-		} catch (final IOException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormInMemorySpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormInMemorySpout.java
deleted file mode 100644
index 179fbcd..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormInMemorySpout.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount.stormoperators;
-
-import backtype.storm.tuple.Values;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.util.AbstractStormSpout;
-
-/**
- * Implements a Storm Spout that reads data from {@link WordCountData#WORDS}.
- */
-public final class StormInMemorySpout extends AbstractStormSpout {
-	private static final long serialVersionUID = -4008858647468647019L;
-
-	private int counter = 0;
-
-	@Override
-	public void nextTuple() {
-		if (this.counter < WordCountData.WORDS.length) {
-			this.collector.emit(new Values(WordCountData.WORDS[this.counter++]));
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountOutputFormatter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountOutputFormatter.java
deleted file mode 100644
index 37936ba..0000000
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountOutputFormatter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount.stormoperators;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.flink.stormcompatibility.util.OutputFormatter;
-
-import java.io.Serializable;
-
-public class WordCountOutputFormatter implements OutputFormatter, Serializable {
-
-	@Override
-	public String format(Object input) {
-		Tuple inputTuple = (Tuple) input;
-		StringBuilder stringBuilder = new StringBuilder();
-		stringBuilder.append("(");
-		for (final Object attribute : inputTuple.getValues()) {
-			stringBuilder.append(attribute);
-			stringBuilder.append(",");
-		}
-		stringBuilder.replace(stringBuilder.length() - 1, stringBuilder.length(), ")");
-		return stringBuilder.toString();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
new file mode 100644
index 0000000..e384f34
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.exclamation;
+
+import org.apache.flink.stormcompatibility.excamation.StormBoltExclamation;
+import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class StormBoltExclamationITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+		this.resultPath = this.getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		this.compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		StormBoltExclamation.main(new String[]{this.textPath, this.resultPath});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
new file mode 100644
index 0000000..e3813c4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.exclamation;
+
+import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
+import org.apache.flink.stormcompatibility.api.FlinkTestCluster;
+import org.apache.flink.stormcompatibility.excamation.StormExclamationLocal;
+import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class StormExclamationLocalITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		FlinkLocalCluster.initialize(new FlinkTestCluster());
+		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+		this.resultPath = this.getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		this.compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		StormExclamationLocal.main(new String[]{this.textPath, this.resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
new file mode 100644
index 0000000..06c5d9a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.exclamation;
+
+import org.apache.flink.stormcompatibility.excamation.StormSpoutExclamation;
+import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class StormSpoutExclamationITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+		this.resultPath = this.getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		this.compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		StormSpoutExclamation.main(new String[]{this.textPath, this.resultPath});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e5be41/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/util/ExclamationData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/util/ExclamationData.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/util/ExclamationData.java
new file mode 100644
index 0000000..8b823b5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/util/ExclamationData.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.exclamation.util;
+
+public class ExclamationData {
+
+	public static final String TEXT_WITH_EXCLAMATIONS =
+			"Goethe - Faust: Der Tragoedie erster Teil!!!!!!\n"
+					+ "Prolog im Himmel.!!!!!!\n"
+					+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei!!!!!!\n"
+					+ "Erzengel treten vor.!!!!!!\n"
+					+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,!!!!!!\n"
+					+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick!!!!!!\n"
+					+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich!!!!!!\n"
+					+ "hohen Werke Sind herrlich wie am ersten Tag.!!!!!!\n"
+					+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde!!!!!!\n"
+					+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es!!!!!!\n"
+					+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und!!!!!!\n"
+					+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.!!!!!!\n"
+					+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land!!!!!!\n"
+					+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.!!!!!!\n"
+					+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch!!!!!!\n"
+					+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.!!!!!!\n"
+					+ "ZU DREI: Der Anblick gibt den Engeln Staerke, Da keiner dich ergruenden!!!!!!\n"
+					+ "mag, Und alle deine hohen Werke Sind herrlich wie am ersten Tag.!!!!!!\n"
+					+ "MEPHISTOPHELES: Da du, o Herr, dich einmal wieder nahst Und fragst, wie!!!!!!\n"
+					+ "alles sich bei uns befinde, Und du mich sonst gewoehnlich gerne sahst, So!!!!!!\n"
+					+ "siehst du mich auch unter dem Gesinde. Verzeih, ich kann nicht hohe Worte!!!!!!\n"
+					+ "machen, Und wenn mich auch der ganze Kreis verhoehnt; Mein Pathos braechte!!!!!!\n"
+					+ "dich gewiss zum Lachen, Haettst du dir nicht das Lachen abgewoehnt. Von!!!!!!\n"
+					+ "Sonn' und Welten weiss ich nichts zu sagen, Ich sehe nur, wie sich die!!!!!!\n"
+					+ "Menschen plagen. Der kleine Gott der Welt bleibt stets von gleichem!!!!!!\n"
+					+ "Schlag, Und ist so wunderlich als wie am ersten Tag. Ein wenig besser!!!!!!\n"
+					+ "wuerd er leben, Haettst du ihm nicht den Schein des Himmelslichts gegeben;!!!!!!\n"
+					+ "Er nennt's Vernunft und braucht's allein, Nur tierischer als jedes Tier!!!!!!\n"
+					+ "zu sein. Er scheint mir, mit Verlaub von euer Gnaden, Wie eine der!!!!!!\n"
+					+ "langbeinigen Zikaden, Die immer fliegt und fliegend springt Und gleich im!!!!!!\n"
+					+ "Gras ihr altes Liedchen singt; Und laeg er nur noch immer in dem Grase! In!!!!!!\n"
+					+ "jeden Quark begraebt er seine Nase.!!!!!!\n"
+					+ "DER HERR: Hast du mir weiter nichts zu sagen? Kommst du nur immer!!!!!!\n"
+					+ "anzuklagen? Ist auf der Erde ewig dir nichts recht?!!!!!!\n"
+					+ "MEPHISTOPHELES: Nein Herr! ich find es dort, wie immer, herzlich!!!!!!\n"
+					+ "schlecht. Die Menschen dauern mich in ihren Jammertagen, Ich mag sogar!!!!!!\n"
+					+ "die armen selbst nicht plagen.!!!!!!\n" + "DER HERR: Kennst du den Faust?!!!!!!\n"
+					+ "MEPHISTOPHELES: Den Doktor?!!!!!!\n"
+					+ "DER HERR: Meinen Knecht!!!!!!!\n"
+					+ "MEPHISTOPHELES: Fuerwahr! er dient Euch auf besondre Weise. Nicht irdisch!!!!!!\n"
+					+ "ist des Toren Trank noch Speise. Ihn treibt die Gaerung in die Ferne, Er!!!!!!\n"
+					+ "ist sich seiner Tollheit halb bewusst; Vom Himmel fordert er die schoensten!!!!!!\n"
+					+ "Sterne Und von der Erde jede hoechste Lust, Und alle Naeh und alle Ferne!!!!!!\n"
+					+ "Befriedigt nicht die tiefbewegte Brust.!!!!!!\n"
+					+ "DER HERR: Wenn er mir auch nur verworren dient, So werd ich ihn bald in!!!!!!\n"
+					+ "die Klarheit fuehren. Weiss doch der Gaertner, wenn das Baeumchen gruent, Das!!!!!!\n"
+					+ "Bluet und Frucht die kuenft'gen Jahre zieren.!!!!!!\n"
+					+ "MEPHISTOPHELES: Was wettet Ihr? den sollt Ihr noch verlieren! Wenn Ihr!!!!!!\n"
+					+ "mir die Erlaubnis gebt, Ihn meine Strasse sacht zu fuehren.!!!!!!\n"
+					+ "DER HERR: Solang er auf der Erde lebt, So lange sei dir's nicht verboten,!!!!!!\n"
+					+ "Es irrt der Mensch so lang er strebt.!!!!!!\n"
+					+ "MEPHISTOPHELES: Da dank ich Euch; denn mit den Toten Hab ich mich niemals!!!!!!\n"
+					+ "gern befangen. Am meisten lieb ich mir die vollen, frischen Wangen. Fuer!!!!!!\n"
+					+ "einem Leichnam bin ich nicht zu Haus; Mir geht es wie der Katze mit der Maus.!!!!!!\n"
+					+ "DER HERR: Nun gut, es sei dir ueberlassen! Zieh diesen Geist von seinem!!!!!!\n"
+					+ "Urquell ab, Und fuehr ihn, kannst du ihn erfassen, Auf deinem Wege mit!!!!!!\n"
+					+ "herab, Und steh beschaemt, wenn du bekennen musst: Ein guter Mensch, in!!!!!!\n"
+					+ "seinem dunklen Drange, Ist sich des rechten Weges wohl bewusst.!!!!!!\n"
+					+ "MEPHISTOPHELES: Schon gut! nur dauert es nicht lange. Mir ist fuer meine!!!!!!\n"
+					+ "Wette gar nicht bange. Wenn ich zu meinem Zweck gelange, Erlaubt Ihr mir!!!!!!\n"
+					+ "Triumph aus voller Brust. Staub soll er fressen, und mit Lust, Wie meine!!!!!!\n"
+					+ "Muhme, die beruehmte Schlange.!!!!!!\n"
+					+ "DER HERR: Du darfst auch da nur frei erscheinen; Ich habe deinesgleichen!!!!!!\n"
+					+ "nie gehasst. Von allen Geistern, die verneinen, ist mir der Schalk am!!!!!!\n"
+					+ "wenigsten zur Last. Des Menschen Taetigkeit kann allzu leicht erschlaffen,!!!!!!\n"
+					+ "er liebt sich bald die unbedingte Ruh; Drum geb ich gern ihm den Gesellen!!!!!!\n"
+					+ "zu, Der reizt und wirkt und muss als Teufel schaffen. Doch ihr, die echten!!!!!!\n"
+					+ "Goettersoehne, Erfreut euch der lebendig reichen Schoene! Das Werdende, das!!!!!!\n"
+					+ "ewig wirkt und lebt, Umfass euch mit der Liebe holden Schranken, Und was!!!!!!\n"
+					+ "in schwankender Erscheinung schwebt, Befestigt mit dauernden Gedanken!!!!!!!\n"
+					+ "(Der Himmel schliesst, die Erzengel verteilen sich.)!!!!!!\n"
+					+ "MEPHISTOPHELES (allein): Von Zeit zu Zeit seh ich den Alten gern, Und!!!!!!\n"
+					+ "huete mich, mit ihm zu brechen. Es ist gar huebsch von einem grossen Herrn,!!!!!!\n"
+					+ "So menschlich mit dem Teufel selbst zu sprechen.!!!!!!";
+}