You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/08/07 14:51:02 UTC

[1/2] flink git commit: [FLINK-2243] [storm-compat] Demonstrating finite Storm spout functionality on exclamation example -minor renaming -improving JavaDocs

Repository: flink
Updated Branches:
  refs/heads/master f1dd914de -> dba2946f4


[FLINK-2243] [storm-compat] Demonstrating finite Storm spout functionality on exclamation example
-minor renaming
-improving JavaDocs

Closes #853


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dba2946f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dba2946f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dba2946f

Branch: refs/heads/master
Commit: dba2946f465a72f18b6452e7ab34f9198b71a908
Parents: 6d9eeb5
Author: szape <ne...@gmail.com>
Authored: Fri Jun 19 10:43:10 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri Aug 7 14:50:02 2015 +0200

----------------------------------------------------------------------
 docs/apis/storm_compatibility.md                |  51 +++++++
 .../excamation/ExclamationTopology.java         |  47 +++++--
 .../excamation/ExclamationWithStormBolt.java    | 131 +++++++++++++++++
 .../excamation/ExclamationWithStormSpout.java   | 140 +++++++++++++++++++
 .../excamation/StormBoltExclamation.java        | 113 ---------------
 .../excamation/StormExclamationLocal.java       |  25 +++-
 .../StormExclamationRemoteByClient.java         |  22 +++
 .../StormExclamationRemoteBySubmitter.java      |  21 +++
 .../excamation/StormSpoutExclamation.java       | 118 ----------------
 .../util/FiniteStormFileSpout.java              |  97 +++++++++++++
 .../util/FiniteStormInMemorySpout.java          |  48 +++++++
 .../util/OutputFormatter.java                   |  11 +-
 .../util/RawOutputFormatter.java                |  32 -----
 .../util/SimpleOutputFormatter.java             |  15 +-
 .../wordcount/BoltTokenizerWordCount.java       |   2 +-
 .../wordcount/SpoutSourceWordCount.java         |   4 +-
 .../wordcount/StormWordCountLocal.java          |   2 +-
 .../wordcount/StormWordCountRemoteByClient.java |   2 +-
 .../StormWordCountRemoteBySubmitter.java        |   2 +-
 .../wordcount/WordCountTopology.java            |   2 +-
 .../ExclamationWithStormBoltITCase.java         |  47 +++++++
 .../ExclamationWithStormSpoutITCase.java        |  47 +++++++
 .../exclamation/StormBoltExclamationITCase.java |  47 -------
 .../StormSpoutExclamationITCase.java            |  47 -------
 24 files changed, 689 insertions(+), 384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
index b8fe66e..1390b92 100644
--- a/docs/apis/storm_compatibility.md
+++ b/docs/apis/storm_compatibility.md
@@ -167,6 +167,57 @@ The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.
 
 See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.  
 
+# Flink Extensions
+
+## Finite Storm Spouts
+
+In Flink streaming, sources can be finite - i.e. emit a finite number of records and stop after emitting the last record -, however, Storm spouts always emit infinite streams.
+The bridge between the two approach is the `FiniteStormSpout` interface which, in addition to `IRichSpout`, contains a `reachedEnd()` method, where the user can specify a stopping-condition.
+The user can create a finite Storm spout by implementing this interface instead of `IRichSpout`, and implementing the `reachedEnd()`method in addition.
+When used as part of a Flink topology, a `FiniteStormSpout` should be wrapped in a `FiniteStormSpoutWrapper` class.
+
+Although finite Storm spouts are not necessary to embed Storm spouts into a Flink streaming program or to submit a whole Storm topology to Flink, there are cases where they may come in handy:
+
+ * to achieve that a native Storm spout behaves the same way as a finite Flink source with minimal modifications
+ * the user wants to process a stream only for some time; after that, the spout can stop automatically
+ * reading a file into a stream
+ * for testing purposes
+
+A `FiniteStormSpout` can be still used as a normal, infinite Storm spout by changing its wrapper class to `StormSpoutWraper` in the Flink topology.
+
+An example of a finite Storm spout that emits records for 10 seconds only:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+public class TimedFiniteStormSpout extends AbstractStormSpout implements FiniteStormSpout {
+	[...]
+	private long starttime = System.currentTimeMillis();
+
+	public boolean reachedEnd() {
+		return System.currentTimeMillis() - starttime > 10000l;
+	}
+	[...]
+}
+~~~
+</div>
+</div>
+
+Using a `FiniteStormSpout` in a Flink topology:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<String> rawInput = env.addSource(
+	new FiniteStormSpoutWrapper<String>(new TimedFiniteStormSpout(), true)
+	TypeExtractor.getForClass(String.class));
+
+// process data stream
+[...]
+~~~
+</div>
+</div>
+
 # Storm Compatibility Examples
 
 You can find more examples in Maven module `flink-storm-compatibilty-examples`.

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
index a5bb571..b7c98a8 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
@@ -20,15 +20,32 @@ package org.apache.flink.stormcompatibility.excamation;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
 import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
+import org.apache.flink.stormcompatibility.util.FiniteStormFileSpout;
+import org.apache.flink.stormcompatibility.util.FiniteStormInMemorySpout;
 import org.apache.flink.stormcompatibility.util.OutputFormatter;
-import org.apache.flink.stormcompatibility.util.RawOutputFormatter;
+import org.apache.flink.stormcompatibility.util.SimpleOutputFormatter;
 import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
 import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
-import org.apache.flink.stormcompatibility.util.StormFileSpout;
-import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
 
 /**
- * This is a basic example of a Storm topology.
+ * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
+ * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>StormExclamation[Local|RemoteByClient|RemoteBySubmitter] &lt;text path&gt;
+ * &lt;result path&gt;</code><br/>
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>construct a regular Storm topology as Flink program</li>
+ * <li>make use of the FiniteStormSpout interface</li>
+ * </ul>
  */
 public class ExclamationTopology {
 
@@ -36,7 +53,7 @@ public class ExclamationTopology {
 	public final static String firstBoltId = "exclamation1";
 	public final static String secondBoltId = "exclamation2";
 	public final static String sinkId = "sink";
-	private final static OutputFormatter formatter = new RawOutputFormatter();
+	private final static OutputFormatter formatter = new SimpleOutputFormatter();
 
 	public static FlinkTopologyBuilder buildTopology() {
 		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
@@ -46,9 +63,9 @@ public class ExclamationTopology {
 			// read the text file from given input path
 			final String[] tokens = textPath.split(":");
 			final String inputFile = tokens[tokens.length - 1];
-			builder.setSpout(spoutId, new StormFileSpout(inputFile));
+			builder.setSpout(spoutId, new FiniteStormFileSpout(inputFile));
 		} else {
-			builder.setSpout(spoutId, new StormInMemorySpout(WordCountData.WORDS));
+			builder.setSpout(spoutId, new FiniteStormInMemorySpout(WordCountData.WORDS));
 		}
 
 		builder.setBolt(firstBoltId, new ExclamationBolt(), 3).shuffleGrouping(spoutId);
@@ -59,9 +76,11 @@ public class ExclamationTopology {
 			// read the text file from given input path
 			final String[] tokens = outputPath.split(":");
 			final String outputFile = tokens[tokens.length - 1];
-			builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter)).shuffleGrouping(secondBoltId);
+			builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter))
+					.shuffleGrouping(secondBoltId);
 		} else {
-			builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4).shuffleGrouping(secondBoltId);
+			builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4)
+					.shuffleGrouping(secondBoltId);
 		}
 
 		return builder;
@@ -84,13 +103,17 @@ public class ExclamationTopology {
 				textPath = args[0];
 				outputPath = args[1];
 			} else {
-				System.err.println("Usage: StormExclamation* <text path> <result path>");
+				System.err.println(
+						"Usage: StormExclamation[Local|RemoteByClient|RemoteBySubmitter] <text " +
+								"path> <result path>");
 				return false;
 			}
 		} else {
-			System.out.println("Executing StormExclamation* example with built-in default data");
+			System.out.println("Executing StormExclamation example with built-in default data");
 			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: StormExclamation* <text path> <result path>");
+			System.out.println(
+					"  Usage: StormExclamation[Local|RemoteByClient|RemoteBySubmitter] <text path>" +
+							" <result path>");
 		}
 
 		return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
new file mode 100644
index 0000000..7bcb7f9
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.excamation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
+import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
+ * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>StormExclamationWithStormBolt &lt;text path&gt; &lt;result path&gt;</code><br/>
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Storm bolt within a Flink Streaming program</li>
+ * </ul>
+ */
+public class ExclamationWithStormBolt {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		final DataStream<String> text = getTextDataStream(env);
+
+		final DataStream<String> exclaimed = text
+				.transform("StormBoltTokenizer",
+						TypeExtractor.getForObject(""),
+						new StormBoltWrapper<String, String>(new ExclamationBolt(), true))
+				.map(new ExclamationMap());
+
+		// emit result
+		if (fileOutput) {
+			exclaimed.writeAsText(outputPath);
+		} else {
+			exclaimed.print();
+		}
+
+		// execute program
+		env.execute("Streaming WordCount with Storm bolt tokenizer");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private static class ExclamationMap implements MapFunction<String, String> {
+
+		@Override
+		public String map(String value) throws Exception {
+			return value + "!!!";
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: ExclamationWithStormBolt <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing ExclamationWithStormBolt example with built-in default data");
+			System.out.println("  Provide parameters to read input data from a file");
+			System.out.println("  Usage: ExclamationWithStormBolt <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		}
+
+		return env.fromElements(WordCountData.WORDS);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
new file mode 100644
index 0000000..f027eae
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.excamation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.util.FiniteStormFileSpout;
+import org.apache.flink.stormcompatibility.util.FiniteStormInMemorySpout;
+import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
+ * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>StormExclamationWithStormSpout &lt;text path&gt; &lt;result path&gt;</code><br/>
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Storm spout within a Flink Streaming program</li>
+ * <li>make use of the FiniteStormSpout interface</li>
+ * </ul>
+ */
+public class ExclamationWithStormSpout {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		final DataStream<String> text = getTextDataStream(env);
+
+		final DataStream<String> exclaimed = text
+				.map(new ExclamationMap())
+				.map(new ExclamationMap());
+
+		// emit result
+		if (fileOutput) {
+			exclaimed.writeAsText(outputPath);
+		} else {
+			exclaimed.print();
+		}
+
+		// execute program
+		env.execute("Streaming Exclamation with Storm spout source");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private static class ExclamationMap implements MapFunction<String, String> {
+
+		@Override
+		public String map(String value) throws Exception {
+			return value + "!!!";
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: ExclamationWithStormSpout <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing ExclamationWithStormSpout example with built-in default " +
+					"data");
+			System.out.println("  Provide parameters to read input data from a file");
+			System.out.println("  Usage: ExclamationWithStormSpout <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			final String[] tokens = textPath.split(":");
+			final String localFile = tokens[tokens.length - 1];
+			return env.addSource(
+					new FiniteStormSpoutWrapper<String>(new FiniteStormFileSpout(localFile), true),
+					TypeExtractor.getForClass(String.class)).setParallelism(1);
+		}
+
+		return env.addSource(
+				new FiniteStormSpoutWrapper<String>(
+						new FiniteStormInMemorySpout(WordCountData.WORDS), true),
+				TypeExtractor.getForClass(String.class)).setParallelism(1);
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormBoltExclamation.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormBoltExclamation.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormBoltExclamation.java
deleted file mode 100644
index 52e740c..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormBoltExclamation.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
-import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class StormBoltExclamation {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		final DataStream<String> text = getTextDataStream(env);
-
-		final DataStream<String> exclaimed = text
-				.transform("StormBoltTokenizer",
-						TypeExtractor.getForObject(""),
-						new StormBoltWrapper<String, String>(new ExclamationBolt(), true))
-				.map(new ExclamationMap());
-
-		// emit result
-		if (fileOutput) {
-			exclaimed.writeAsText(outputPath);
-		} else {
-			exclaimed.print();
-		}
-
-		// execute program
-		env.execute("Streaming WordCount with Storm bolt tokenizer");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	private static class ExclamationMap implements MapFunction<String, String> {
-
-		@Override
-		public String map(String value) throws Exception {
-			return value + "!!!";
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: StormBoltExclamation <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing StormBoltExclamation example with built-in default data");
-			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: StormBoltExclamation <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		}
-
-		return env.fromElements(WordCountData.WORDS);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
index a25e5e0..5941ff0 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
@@ -21,6 +21,27 @@ import backtype.storm.utils.Utils;
 import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
 import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
 
+/**
+ * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
+ * files in a streaming fashion. The program is constructed as a regular {@link StormTopology} and
+ * submitted to Flink for execution in the same way as to a Storm {@link LocalCluster}.
+ * <p/>
+ * This example shows how to run program directly within Java, thus it cannot be used to submit a
+ * {@link StormTopology} via Flink command line clients (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>StormExclamationLocal &lt;text path&gt; &lt;result path&gt;</code><br/>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>run a regular Storm program locally on Flink</li>
+ * </ul>
+ */
 public class StormExclamationLocal {
 
 	public final static String topologyId = "Streaming Exclamation";
@@ -43,10 +64,6 @@ public class StormExclamationLocal {
 		cluster.submitTopology(topologyId, null, builder.createTopology());
 
 		Utils.sleep(10 * 1000);
-
-		// TODO kill does no do anything so far
-		cluster.killTopology(topologyId);
-		cluster.shutdown();
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
index 3f55316..0f64301 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
@@ -25,6 +25,28 @@ import backtype.storm.utils.Utils;
 import org.apache.flink.stormcompatibility.api.FlinkClient;
 import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
 
+/**
+ * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
+ * files in a streaming fashion. The program is constructed as a regular {@link StormTopology} and
+ * submitted to Flink for execution in the same way as to a Storm cluster similar to
+ * {@link NimbusClient}. The Flink cluster can be local or remote.
+ * <p/>
+ * This example shows how to submit the program via Java, thus it cannot be used to submit a
+ * {@link StormTopology} via Flink command line clients (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>StormExclamationRemoteByClient &lt;text path&gt; &lt;result path&gt;</code><br/>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
+ * </ul>
+ */
 public class StormExclamationRemoteByClient {
 
 	public final static String topologyId = "Streaming Exclamation";

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
index 728c5c7..d580520 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
@@ -22,6 +22,27 @@ import org.apache.flink.stormcompatibility.api.FlinkClient;
 import org.apache.flink.stormcompatibility.api.FlinkSubmitter;
 import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
 
+/**
+ * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
+ * files in a streaming fashion. The program is constructed as a regular {@link StormTopology} and
+ * submitted to Flink for execution in the same way as to a Storm cluster similar to
+ * {@link StormSubmitter}. The Flink cluster can be local or remote.
+ * <p/>
+ * This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>StormExclamationRemoteByClient &lt;text path&gt; &lt;result path&gt;</code><br/>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
+ * </ul>
+ */
 public class StormExclamationRemoteBySubmitter {
 
 	public final static String topologyId = "Streaming Exclamation";

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormSpoutExclamation.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormSpoutExclamation.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormSpoutExclamation.java
deleted file mode 100644
index 2569fa7..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormSpoutExclamation.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.util.StormFileSpout;
-import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
-import org.apache.flink.stormcompatibility.wrappers.StormFiniteSpoutWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class StormSpoutExclamation {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		final DataStream<String> text = getTextDataStream(env);
-
-		final DataStream<String> exclaimed = text
-				.map(new ExclamationMap())
-				.map(new ExclamationMap());
-
-		// emit result
-		if (fileOutput) {
-			exclaimed.writeAsText(outputPath);
-		} else {
-			exclaimed.print();
-		}
-
-		// execute program
-		env.execute("Streaming Exclamation with Storm spout source");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	private static class ExclamationMap implements MapFunction<String, String> {
-
-		@Override
-		public String map(String value) throws Exception {
-			return value + "!!!";
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: StormSpoutExclamation <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing StormSpoutExclamation example with built-in default data");
-			System.out.println("  Provide parameters to read input data from a file");
-			System.out.println("  Usage: StormSpoutExclamation <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			final String[] tokens = textPath.split(":");
-			final String localFile = tokens[tokens.length - 1];
-			return env.addSource(
-					new StormFiniteSpoutWrapper<String>(new StormFileSpout(localFile), true),
-					TypeExtractor.getForClass(String.class)).setParallelism(1);
-		}
-
-		return env.addSource(new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(WordCountData.WORDS), true),
-				TypeExtractor.getForClass(String.class));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
new file mode 100644
index 0000000..d45ad76
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Values;
+import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Implements a Storm Spout that reads data from a given local file. The spout stops automatically
+ * when it reached the end of the file.
+ */
+public class FiniteStormFileSpout extends AbstractStormSpout implements FiniteStormSpout {
+	private static final long serialVersionUID = -6996907090003590436L;
+
+	private final String path;
+	private BufferedReader reader;
+	private String line;
+	private boolean newLineRead;
+
+	public FiniteStormFileSpout(final String path) {
+		this.path = path;
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void open(final Map conf, final TopologyContext context,
+			final SpoutOutputCollector collector) {
+		super.open(conf, context, collector);
+		try {
+			this.reader = new BufferedReader(new FileReader(this.path));
+		} catch (final FileNotFoundException e) {
+			throw new RuntimeException(e);
+		}
+		newLineRead = false;
+	}
+
+	@Override
+	public void close() {
+		if (this.reader != null) {
+			try {
+				this.reader.close();
+			} catch (final IOException e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
+
+	@Override
+	public void nextTuple() {
+		this.collector.emit(new Values(line));
+		newLineRead = false;
+	}
+
+	/**
+	 * Can be called before nextTuple() any times including 0.
+	 */
+	public boolean reachedEnd() {
+		try {
+			readLine();
+		} catch (IOException e) {
+			throw new RuntimeException("Exception occured while reading file " + path);
+		}
+		return line == null;
+	}
+
+	private void readLine() throws IOException {
+		if (!newLineRead) {
+			line = reader.readLine();
+			newLineRead = true;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
new file mode 100644
index 0000000..899c569
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.tuple.Values;
+import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
+
+/**
+ * Implements a Storm Spout that reads String[] data stored in the memory. The spout stops
+ * automatically when it emitted all of the data.
+ */
+public class FiniteStormInMemorySpout extends AbstractStormSpout implements FiniteStormSpout {
+
+	private static final long serialVersionUID = -4008858647468647019L;
+
+	private String[] source;
+	private int counter = 0;
+
+	public FiniteStormInMemorySpout(String[] source) {
+		this.source = source;
+	}
+
+	@Override
+	public void nextTuple() {
+			this.collector.emit(new Values(source[this.counter++]));
+	}
+
+	public boolean reachedEnd() {
+		return counter >= source.length;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
index bfc3135..ec9adfe 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
@@ -18,12 +18,19 @@
 
 package org.apache.flink.stormcompatibility.util;
 
-import java.io.Serializable;
-
 import backtype.storm.tuple.Tuple;
 
+import java.io.Serializable;
+
 public interface OutputFormatter extends Serializable {
 
+	/**
+	 * Converts a Storm {@link Tuple} to a string. This method is used for formatting the output
+	 * tuples before writing them out to a file or to the consol.
+	 *
+	 * @param input The tuple to be formatted
+	 * @return The string result of the formatting
+	 */
 	public String format(Tuple input);
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java
deleted file mode 100644
index 7faf6cd..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.tuple.Tuple;
-
-public class RawOutputFormatter implements OutputFormatter {
-	private static final long serialVersionUID = 8685668993521259832L;
-
-	@Override
-	public String format(final Tuple input) {
-		assert (input.size() == 1);
-		return input.getValue(0).toString();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
index ccb617b..0702e94 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
@@ -23,9 +23,20 @@ import backtype.storm.tuple.Tuple;
 public class SimpleOutputFormatter implements OutputFormatter {
 	private static final long serialVersionUID = 6349573860144270338L;
 
+	/**
+	 * Converts a Storm {@link Tuple} with 1 field to a string by retrieving the value of that
+	 * field. This method is used for formatting raw outputs wrapped in tuples, before writing them
+	 * out to a file or to the consol.
+	 *
+	 * @param input
+	 * 		The tuple to be formatted
+	 * @return The string result of the formatting
+	 */
 	@Override
 	public String format(final Tuple input) {
-		return input.getValues().toString();
+		if (input.getValues().size() != 1) {
+			throw new RuntimeException("The output is not raw");
+		}
+		return input.getValue(0).toString();
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
index 8f4503f..eab58f5 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
@@ -40,7 +40,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  * <p/>
  * This example shows how to:
  * <ul>
- * <li>use a Storm bolt within a Flink Streaming program.
+ * <li>use a Storm bolt within a Flink Streaming program.</li>
  * </ul>
  */
 public class BoltTokenizerWordCount {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
index 361d83a..4c012d8 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
@@ -43,7 +43,7 @@ import org.apache.flink.util.Collector;
  * <p/>
  * This example shows how to:
  * <ul>
- * <li>use a Storm bolt within a Flink Streaming program.
+ * <li>use a Storm spout within a Flink Streaming program.</li>
  * </ul>
  */
 public class SpoutSourceWordCount {
@@ -145,7 +145,7 @@ public class SpoutSourceWordCount {
 		}
 
 		return env.addSource(new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(WordCountData.WORDS), true),
-				TypeExtractor.getForClass(String.class));
+				TypeExtractor.getForClass(String.class)).setParallelism(1);
 
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
index 3fbd5b7..836c8e9 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
@@ -42,7 +42,7 @@ import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
  * <p/>
  * This example shows how to:
  * <ul>
- * <li>run a regular Storm program locally on Flink
+ * <li>run a regular Storm program locally on Flink</li>
  * </ul>
  */
 public class StormWordCountLocal {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
index 9e56c14..0bbe11b 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
@@ -46,7 +46,7 @@ import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
  * <p/>
  * This example shows how to:
  * <ul>
- * <li>submit a regular Storm program to a local or remote Flink cluster.
+ * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
  * </ul>
  */
 public class StormWordCountRemoteByClient {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
index a1fb79d..264dc41 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
@@ -42,7 +42,7 @@ import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
  * <p/>
  * This example shows how to:
  * <ul>
- * <li>submit a regular Storm program to a local or remote Flink cluster.
+ * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
  * </ul>
  */
 public class StormWordCountRemoteBySubmitter {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
index f028266..367ca9e 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
@@ -47,7 +47,7 @@ import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTok
  * <p/>
  * This example shows how to:
  * <ul>
- * <li>how to construct a regular Storm topology as Flink program
+ * <li>how to construct a regular Storm topology as Flink program</li>
  * </ul>
  */
 public class WordCountTopology {

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
new file mode 100644
index 0000000..930f87b
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.exclamation;
+
+import org.apache.flink.stormcompatibility.excamation.ExclamationWithStormBolt;
+import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class ExclamationWithStormBoltITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+		this.resultPath = this.getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExclamationWithStormBolt.main(new String[]{this.textPath, this.resultPath});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
new file mode 100644
index 0000000..4c515ce
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.exclamation;
+
+import org.apache.flink.stormcompatibility.excamation.ExclamationWithStormSpout;
+import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class ExclamationWithStormSpoutITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+		this.resultPath = this.getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExclamationWithStormSpout.main(new String[]{this.textPath, this.resultPath});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
deleted file mode 100644
index 75dd5fc..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.exclamation;
-
-import org.apache.flink.stormcompatibility.excamation.StormBoltExclamation;
-import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class StormBoltExclamationITCase extends StreamingProgramTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
-		this.resultPath = this.getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		StormBoltExclamation.main(new String[]{this.textPath, this.resultPath});
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
deleted file mode 100644
index 2b08b4b..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.exclamation;
-
-import org.apache.flink.stormcompatibility.excamation.StormSpoutExclamation;
-import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class StormSpoutExclamationITCase extends StreamingProgramTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
-		this.resultPath = this.getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		StormSpoutExclamation.main(new String[]{this.textPath, this.resultPath});
-	}
-
-}


[2/2] flink git commit: [FLINK-2243] [storm-compat] Added finite spout functionality to Storm compatibility layer

Posted by mb...@apache.org.
[FLINK-2243] [storm-compat] Added finite spout functionality to Storm compatibility layer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d9eeb55
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d9eeb55
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d9eeb55

Branch: refs/heads/master
Commit: 6d9eeb559a895d92bb0a71c6535c55dfb49a16cb
Parents: f1dd914
Author: szape <ne...@gmail.com>
Authored: Fri Jun 19 10:22:04 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri Aug 7 14:50:02 2015 +0200

----------------------------------------------------------------------
 .../api/FlinkTopologyBuilder.java               | 13 ++-
 .../wrappers/AbstractStormSpoutWrapper.java     |  1 -
 .../wrappers/FiniteStormSpout.java              | 37 +++++++++
 .../wrappers/FiniteStormSpoutWrapper.java       | 87 ++++++++++++++++++++
 .../wrappers/FiniteStormSpoutWrapperTest.java   | 69 ++++++++++++++++
 5 files changed, 205 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d9eeb55/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index 4ecf4a6..d146250 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -33,6 +33,9 @@ import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper;
+import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
+import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
 import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
 import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -93,7 +96,15 @@ public class FlinkTopologyBuilder {
 			 * -> add an additional output attribute tagging the output stream, and use .split() and .select() to split
 			 * the streams
 			 */
-			final DataStreamSource source = env.addSource(new StormSpoutWrapper(userSpout), declarer.getOutputType());
+			AbstractStormSpoutWrapper spoutWrapper;
+
+			if (userSpout instanceof FiniteStormSpout) {
+				spoutWrapper = new FiniteStormSpoutWrapper((FiniteStormSpout) userSpout);
+			} else {
+				spoutWrapper = new StormSpoutWrapper(userSpout);
+			}
+
+			final DataStreamSource source = env.addSource(spoutWrapper, declarer.getOutputType());
 			availableOperators.put(spoutId, source);
 
 			int dop = 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/6d9eeb55/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
index 3021bcb..4e43a8a 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
@@ -19,7 +19,6 @@ package org.apache.flink.stormcompatibility.wrappers;
 
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.topology.IRichSpout;
-
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/6d9eeb55/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
new file mode 100644
index 0000000..58a4f7a
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.topology.IRichSpout;
+
+/**
+ * This interface represents a Storm spout that emits a finite number of records. Common Storm
+ * spouts emit infinite streams by default. To change this behaviour and take advantage of
+ * Flink's finite-source capabilities, the spout should implement this interface. To wrap
+ * {@link FiniteStormSpout} separately, use {@link FiniteStormSpoutWrapper}.
+ */
+public interface FiniteStormSpout extends IRichSpout {
+
+	/**
+	 * When returns true, the spout has reached the end of the stream.
+	 *
+	 * @return true, if the spout's stream reached its end, false otherwise
+	 */
+	public boolean reachedEnd();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d9eeb55/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
new file mode 100644
index 0000000..7913510
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+/**
+ * A {@link FiniteStormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls the wrapped
+ * {@link FiniteStormSpout}'s {@link FiniteStormSpout#nextTuple()} method until {@link
+ * FiniteStormSpout#reachedEnd()} is true.
+ */
+public class FiniteStormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
+	private static final long serialVersionUID = -218340336648247605L;
+
+	private FiniteStormSpout finiteSpout;
+
+	/**
+	 * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link
+	 * FiniteStormSpout spout} such that it can be used within a Flink streaming program. The
+	 * output
+	 * type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared
+	 * number of attributes.
+	 *
+	 * @param spout
+	 * 		The Storm {@link FiniteStormSpout spout} to be used. @throws
+	 * 		IllegalArgumentException If
+	 * 		the number of declared output attributes is not with range [1;25].
+	 */
+	public FiniteStormSpoutWrapper(FiniteStormSpout spout)
+			throws IllegalArgumentException {
+		super(spout);
+		this.finiteSpout = spout;
+	}
+
+	/**
+	 * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link
+	 * FiniteStormSpout spout} such that it can be used within a Flink streaming program. The
+	 * output
+	 * type can be any type if parameter {@code rawOutput} is {@code true} and the spout's
+	 * number of
+	 * declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be
+	 * one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
+	 * attributes.
+	 *
+	 * @param spout
+	 * 		The Storm {@link FiniteStormSpout spout} to be used.
+	 * @param rawOutput
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link
+	 * 		Tuple1} but be of a raw type.
+	 * @throws IllegalArgumentException
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is
+	 * 		not 1
+	 * 		or if {@code rawOuput} is {@code false} and the number of declared output attributes
+	 * 		is not
+	 * 		with range [1;25].
+	 */
+	public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final boolean rawOutput)
+			throws IllegalArgumentException {
+		super(spout, rawOutput);
+		this.finiteSpout = spout;
+	}
+
+	/**
+	 * Calls the {@link FiniteStormSpout#nextTuple()} method until {@link
+	 * FiniteStormSpout#reachedEnd()} is true or {@link FiniteStormSpout#cancel()} is called.
+	 */
+	@Override
+	protected void execute() {
+		while (super.isRunning && !finiteSpout.reachedEnd()) {
+			finiteSpout.nextTuple();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d9eeb55/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
new file mode 100644
index 0000000..776e65d
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StormWrapperSetupHelper.class)
+public class FiniteStormSpoutWrapperTest {
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void runAndExecuteTest1() throws Exception {
+
+		FiniteStormSpout stormSpout =
+				mock(FiniteStormSpout.class);
+		when(stormSpout.reachedEnd()).thenReturn(false, false, false, true, false, false, true);
+
+		FiniteStormSpoutWrapper<?> wrapper =
+				new FiniteStormSpoutWrapper<Object>(stormSpout);
+		wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+
+		wrapper.run(mock(SourceContext.class));
+		verify(stormSpout, times(3)).nextTuple();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void runAndExecuteTest2() throws Exception {
+
+		FiniteStormSpout stormSpout =
+				mock(FiniteStormSpout.class);
+		when(stormSpout.reachedEnd()).thenReturn(true, false, true, false, true, false, true);
+
+		FiniteStormSpoutWrapper<?> wrapper =
+				new FiniteStormSpoutWrapper<Object>(stormSpout);
+		wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+
+		wrapper.run(mock(SourceContext.class));
+		verify(stormSpout, never()).nextTuple();
+	}
+
+}