You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:31 UTC

[06/15] flink git commit: [Storm Compatibility] Maven module restucturing and cleanup - removed storm-parent; renamed storm-core and storm-examples - updated internal Java package structure * renamed package "stormcompatibility" to "storm" *

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java
new file mode 100644
index 0000000..985cd68
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.excamation;
+
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import org.apache.flink.storm.excamation.operators.ExclamationBolt;
+
+/**
+ * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text files in a streaming
+ * fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology} and submitted to
+ * Flink for execution in the same way as to a Storm {@link backtype.storm.LocalCluster}.
+ * <p/>
+ * This example shows how to run program directly within Java, thus it cannot be used to submit a
+ * {@link backtype.storm.generated.StormTopology} via Flink command line clients (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>ExclamationLocal &lt;text path&gt; &lt;result path&gt;</code><br/>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>run a regular Storm program locally on Flink</li>
+ * </ul>
+ */
+public class ExclamationLocal {
+
+	public final static String topologyId = "Streaming Exclamation";
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!ExclamationTopology.parseParameters(args)) {
+			return;
+		}
+
+		// build Topology the Storm way
+		final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology();
+
+		// execute program locally
+		Config conf = new Config();
+		conf.put(ExclamationBolt.EXCLAMATION_COUNT, ExclamationTopology.getExclamation());
+		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
+		cluster.submitTopology(topologyId, conf, builder.createTopology());
+
+		Utils.sleep(10 * 1000);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java
new file mode 100644
index 0000000..70d25a2
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.excamation;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import org.apache.flink.storm.excamation.operators.ExclamationBolt;
+import org.apache.flink.storm.util.FiniteFileSpout;
+import org.apache.flink.storm.util.FiniteInMemorySpout;
+import org.apache.flink.storm.util.OutputFormatter;
+import org.apache.flink.storm.util.SimpleOutputFormatter;
+import org.apache.flink.storm.util.BoltFileSink;
+import org.apache.flink.storm.util.BoltPrintSink;
+
+/**
+ * Implements the "Exclamation" program that attaches two exclamation marks to every line of a text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>Exclamation[Local|RemoteByClient|RemoteBySubmitter] &lt;text path&gt;
+ * &lt;result path&gt;</code><br/>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>construct a regular Storm topology as Flink program</li>
+ * <li>make use of the FiniteSpout interface</li>
+ * </ul>
+ */
+public class ExclamationTopology {
+
+	public final static String spoutId = "source";
+	public final static String firstBoltId = "exclamation1";
+	public final static String secondBoltId = "exclamation2";
+	public final static String sinkId = "sink";
+	private final static OutputFormatter formatter = new SimpleOutputFormatter();
+
+	public static FlinkTopologyBuilder buildTopology() {
+		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+
+		// get input data
+		if (fileInputOutput) {
+			// read the text file from given input path
+			final String[] tokens = textPath.split(":");
+			final String inputFile = tokens[tokens.length - 1];
+			builder.setSpout(spoutId, new FiniteFileSpout(inputFile));
+		} else {
+			builder.setSpout(spoutId, new FiniteInMemorySpout(WordCountData.WORDS));
+		}
+
+		builder.setBolt(firstBoltId, new ExclamationBolt(), 3).shuffleGrouping(spoutId);
+		builder.setBolt(secondBoltId, new ExclamationBolt(), 2).shuffleGrouping(firstBoltId);
+
+		// emit result
+		if (fileInputOutput) {
+			// read the text file from given input path
+			final String[] tokens = outputPath.split(":");
+			final String outputFile = tokens[tokens.length - 1];
+			builder.setBolt(sinkId, new BoltFileSink(outputFile, formatter))
+			.shuffleGrouping(secondBoltId);
+		} else {
+			builder.setBolt(sinkId, new BoltPrintSink(formatter), 4)
+			.shuffleGrouping(secondBoltId);
+		}
+
+		return builder;
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileInputOutput = false;
+	private static String textPath;
+	private static String outputPath;
+	private static int exclamationNum = 3;
+
+	static int getExclamation() {
+		return exclamationNum;
+	}
+
+	static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileInputOutput = true;
+			if (args.length == 3) {
+				textPath = args[0];
+				outputPath = args[1];
+				exclamationNum = Integer.parseInt(args[2]);
+			} else {
+				System.err.println("Usage: StormExclamation* <text path> <result path>  <number of exclamation marks>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing StormExclamation example with built-in default data");
+			System.out.println("  Provide parameters to read input data from a file");
+			System.out.println("  Usage: StormExclamation <text path> <result path> <number of exclamation marks>");
+		}
+
+		return true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java
new file mode 100644
index 0000000..01ab907
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.excamation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.excamation.operators.ExclamationBolt;
+import org.apache.flink.storm.util.StormConfig;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import backtype.storm.utils.Utils;
+
+/**
+ * Implements the "Exclamation" program that attaches 3+x exclamation marks to every line of a text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage:
+ * <code>ExclamationWithmBolt &lt;text path&gt; &lt;result path&gt; &lt;number of exclamation marks&gt;</code><br/>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData} with x=2.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Bolt within a Flink Streaming program</li>
+ * <li>how to configure a Bolt using StormConfig</li>
+ * </ul>
+ */
+public class ExclamationWithBolt {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// set Storm configuration
+		StormConfig config = new StormConfig();
+		config.put(ExclamationBolt.EXCLAMATION_COUNT, new Integer(exclamationNum));
+		env.getConfig().setGlobalJobParameters(config);
+
+		// get input data
+		final DataStream<String> text = getTextDataStream(env);
+
+		final DataStream<String> exclaimed = text
+				.transform("StormBoltTokenizer",
+						TypeExtractor.getForObject(""),
+						new BoltWrapper<String, String>(new ExclamationBolt(),
+								new String[] { Utils.DEFAULT_STREAM_ID }))
+								.map(new ExclamationMap());
+
+		// emit result
+		if (fileOutput) {
+			exclaimed.writeAsText(outputPath);
+		} else {
+			exclaimed.print();
+		}
+
+		// execute program
+		env.execute("Streaming WordCount with bolt tokenizer");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private static class ExclamationMap implements MapFunction<String, String> {
+		private static final long serialVersionUID = 4614754344067170619L;
+
+		@Override
+		public String map(String value) throws Exception {
+			return value + "!!!";
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+	private static int exclamationNum = 2;
+
+	private static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 3) {
+				textPath = args[0];
+				outputPath = args[1];
+				exclamationNum = Integer.parseInt(args[2]);
+			} else {
+				System.err.println("Usage: ExclamationWithBolt <text path> <result path> <number of exclamation marks>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing ExclamationWithBolt example with built-in default data");
+			System.out.println("  Provide parameters to read input data from a file");
+			System.out.println("  Usage: ExclamationWithBolt <text path> <result path> <number of exclamation marks>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		}
+
+		return env.fromElements(WordCountData.WORDS);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java
new file mode 100644
index 0000000..22938e5
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.excamation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.util.FiniteFileSpout;
+import org.apache.flink.storm.util.FiniteInMemorySpout;
+import org.apache.flink.storm.util.StormConfig;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import backtype.storm.utils.Utils;
+
+/**
+ * Implements the "Exclamation" program that attaches six exclamation marks to every line of a text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>ExclamationWithSpout &lt;text path&gt; &lt;result path&gt;</code><br/>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Storm spout within a Flink Streaming program</li>
+ * <li>make use of the FiniteSpout interface</li>
+ * <li>make use of the FiniteSpout interface</li>
+ * <li>how to configure a Spout using StormConfig</li>
+ * </ul>
+ */
+public class ExclamationWithSpout {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		final DataStream<String> text = getTextDataStream(env);
+
+		final DataStream<String> exclaimed = text
+				.map(new ExclamationMap())
+				.map(new ExclamationMap());
+
+		// emit result
+		if (fileOutput) {
+			exclaimed.writeAsText(outputPath);
+		} else {
+			exclaimed.print();
+		}
+
+		// execute program
+		env.execute("Streaming Exclamation with Storm spout source");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private static class ExclamationMap implements MapFunction<String, String> {
+		private static final long serialVersionUID = -684993133807698042L;
+
+		@Override
+		public String map(String value) throws Exception {
+			return value + "!!!";
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: ExclamationWithSpout <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing ExclamationWithSpout example with built-in default data");
+			System.out.println("  Provide parameters to read input data from a file");
+			System.out.println("  Usage: ExclamationWithSpout <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			final String[] tokens = textPath.split(":");
+			final String inputFile = tokens[tokens.length - 1];
+
+			// set Storm configuration
+			StormConfig config = new StormConfig();
+			config.put(FiniteFileSpout.INPUT_FILE_PATH, inputFile);
+			env.getConfig().setGlobalJobParameters(config);
+
+			return env.addSource(
+					new SpoutWrapper<String>(new FiniteFileSpout(),
+							new String[] { Utils.DEFAULT_STREAM_ID }),
+							TypeExtractor.getForClass(String.class)).setParallelism(1);
+		}
+
+		return env.addSource(
+				new SpoutWrapper<String>(new FiniteInMemorySpout(
+						WordCountData.WORDS), new String[] { Utils.DEFAULT_STREAM_ID }),
+						TypeExtractor.getForClass(String.class)).setParallelism(1);
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java
new file mode 100644
index 0000000..cfc49a1
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.excamation.operators;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+public class ExclamationBolt implements IRichBolt {
+	private final static long serialVersionUID = -6364882114201311380L;
+
+	public final static String EXCLAMATION_COUNT = "exclamation.count";
+
+	private OutputCollector collector;
+	private String exclamation;
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+		this.collector = collector;
+
+		Object count = conf.get(EXCLAMATION_COUNT);
+		if (count != null) {
+			int exclamationNum = (Integer) count;
+			StringBuilder builder = new StringBuilder();
+			for (int index = 0; index < exclamationNum; ++index) {
+				builder.append('!');
+			}
+			this.exclamation = builder.toString();
+		} else {
+			this.exclamation = "!";
+		}
+	}
+
+	@Override
+	public void cleanup() {
+	}
+
+	@Override
+	public void execute(Tuple tuple) {
+		collector.emit(tuple, new Values(tuple.getString(0) + this.exclamation));
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("word"));
+	}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java
new file mode 100644
index 0000000..560fe51
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.split;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.split.operators.RandomSpout;
+import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
+import org.apache.flink.storm.util.SplitStreamMapper;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements a simple example with two declared output streams for the embedded spout.
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>handle multiple output stream of a spout</li>
+ * <li>accessing each stream by .split(...) and .select(...)</li>
+ * <li>strip wrapper data type SplitStreamType for further processing in Flink</li>
+ * </ul>
+ * <p/>
+ * This example would work the same way for multiple bolt output streams.
+ */
+public class SpoutSplitExample {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		String[] rawOutputs = new String[] { RandomSpout.EVEN_STREAM, RandomSpout.ODD_STREAM };
+
+		final DataStream<SplitStreamType<Integer>> numbers = env.addSource(
+				new SpoutWrapper<SplitStreamType<Integer>>(new RandomSpout(true, 0),
+						rawOutputs), TypeExtractor.getForObject(new SplitStreamType<Integer>()));
+
+		SplitStream<SplitStreamType<Integer>> splitStream = numbers
+				.split(new StormStreamSelector<Integer>());
+
+		DataStream<SplitStreamType<Integer>> evenStream = splitStream.select(RandomSpout.EVEN_STREAM);
+		DataStream<SplitStreamType<Integer>> oddStream = splitStream.select(RandomSpout.ODD_STREAM);
+
+		evenStream.map(new SplitStreamMapper<Integer>()).returns(Integer.class).map(new Enrich("even")).print();
+		oddStream.transform("oddBolt",
+				TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
+				new BoltWrapper<SplitStreamType<Integer>, Tuple2<String, Integer>>(
+						new VerifyAndEnrichBolt(false)))
+						.print();
+
+		// execute program
+		env.execute("Spout split stream example");
+	}
+
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Same as {@link VerifyAndEnrichBolt}.
+	 */
+	private final static class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 5213888269197438892L;
+		private final Tuple2<String, Integer> out;
+
+		public Enrich(String token) {
+			this.out = new Tuple2<String, Integer>(token, 0);
+		}
+
+		@Override
+		public Tuple2<String, Integer> map(Integer value) throws Exception {
+			this.out.setField(value, 1);
+			return this.out;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java
new file mode 100644
index 0000000..d315395
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.split.operators;
+
+import java.util.Map;
+import java.util.Random;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+public class RandomSpout extends BaseRichSpout {
+	private static final long serialVersionUID = -3978554318742509334L;
+
+	public static final String EVEN_STREAM = "even";
+	public static final String ODD_STREAM = "odd";
+
+	private final boolean split;
+	private Random r = new Random();
+	private SpoutOutputCollector collector;
+
+	public RandomSpout(boolean split, long seed) {
+		this.split = split;
+		this.r = new Random(seed);
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+		this.collector = collector;
+	}
+
+	@Override
+	public void nextTuple() {
+		int i = r.nextInt();
+		if (split) {
+			if (i % 2 == 0) {
+				this.collector.emit(EVEN_STREAM, new Values(i));
+			} else {
+				this.collector.emit(ODD_STREAM, new Values(i));
+			}
+		} else {
+			this.collector.emit(new Values(i));
+		}
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		Fields schema = new Fields("number");
+		if (split) {
+			declarer.declareStream(EVEN_STREAM, schema);
+			declarer.declareStream(ODD_STREAM, schema);
+		} else {
+			declarer.declare(schema);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java
new file mode 100644
index 0000000..99fec4d
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.split.operators;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class VerifyAndEnrichBolt extends BaseRichBolt {
+	private static final long serialVersionUID = -7277395570966328721L;
+
+	private final boolean evenOrOdd; // true: even -- false: odd
+	private final String token;
+	private OutputCollector collector;
+
+	public VerifyAndEnrichBolt(boolean evenOrOdd) {
+		this.evenOrOdd = evenOrOdd;
+		this.token = evenOrOdd ? "even" : "odd";
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+		this.collector = collector;
+	}
+
+	@Override
+	public void execute(Tuple input) {
+		if ((input.getInteger(0) % 2 == 0) != this.evenOrOdd) {
+			throw new RuntimeException("Invalid number detected.");
+		}
+		this.collector.emit(new Values(this.token, input.getInteger(0)));
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("evenOrOdd", "number"));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java
new file mode 100644
index 0000000..a6c61d4
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+
+import java.util.Map;
+
+/**
+ * Implements a sink that write the received data so some external output. The result is formatted like
+ * {@code (a1, a2, ..., an)} with {@code Object.toString()} for each attribute).
+ */
+public abstract class AbstractBoltSink implements IRichBolt {
+	private static final long serialVersionUID = -1626323806848080430L;
+
+	private StringBuilder lineBuilder;
+	private String prefix = "";
+	private final OutputFormatter formatter;
+
+	public AbstractBoltSink(final OutputFormatter formatter) {
+		this.formatter = formatter;
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public final void prepare(final Map stormConf, final TopologyContext context,
+			final OutputCollector collector) {
+		this.prepareSimple(stormConf, context);
+		if (context.getComponentCommon(context.getThisComponentId()).get_parallelism_hint() > 1) {
+			this.prefix = context.getThisTaskId() + "> ";
+		}
+	}
+
+	protected abstract void prepareSimple(final Map<?, ?> stormConf, final TopologyContext context);
+
+	@Override
+	public final void execute(final Tuple input) {
+		this.lineBuilder = new StringBuilder();
+		this.lineBuilder.append(this.prefix);
+		this.lineBuilder.append(this.formatter.format(input));
+		this.writeExternal(this.lineBuilder.toString());
+	}
+
+	protected abstract void writeExternal(final String line);
+
+	@Override
+	public void cleanup() {/* nothing to do */}
+
+	@Override
+	public final void declareOutputFields(final OutputFieldsDeclarer declarer) {/* nothing to do */}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java
new file mode 100644
index 0000000..d19ffbf
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+import java.util.Map;
+
+/**
+ * Base class for Spouts that read data line by line from an arbitrary source. The declared output schema has a single
+ * attribute called {@code line} and should be of type {@link String}.
+ */
+public abstract class AbstractLineSpout implements IRichSpout {
+	private static final long serialVersionUID = 8876828403487806771L;
+
+	public final static String ATTRIBUTE_LINE = "line";
+
+	protected SpoutOutputCollector collector;
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+		this.collector = collector;
+	}
+
+	@Override
+	public void close() {/* noting to do */}
+
+	@Override
+	public void activate() {/* noting to do */}
+
+	@Override
+	public void deactivate() {/* noting to do */}
+
+	@Override
+	public void ack(final Object msgId) {/* noting to do */}
+
+	@Override
+	public void fail(final Object msgId) {/* noting to do */}
+
+	@Override
+	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields(ATTRIBUTE_LINE));
+	}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java
new file mode 100644
index 0000000..5cd3f68
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.task.TopologyContext;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Implements a sink that write the received data to the given file (as a result of {@code Object.toString()} for each
+ * attribute).
+ */
+public final class BoltFileSink extends AbstractBoltSink {
+	private static final long serialVersionUID = 2014027288631273666L;
+
+	private final String path;
+	private BufferedWriter writer;
+
+	public BoltFileSink(final String path) {
+		this(path, new SimpleOutputFormatter());
+	}
+
+	public BoltFileSink(final String path, final OutputFormatter formatter) {
+		super(formatter);
+		this.path = path;
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepareSimple(final Map stormConf, final TopologyContext context) {
+		try {
+			this.writer = new BufferedWriter(new FileWriter(this.path));
+		} catch (final IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public void writeExternal(final String line) {
+		try {
+			this.writer.write(line + "\n");
+		} catch (final IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public void cleanup() {
+		if (this.writer != null) {
+			try {
+				this.writer.close();
+			} catch (final IOException e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltPrintSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltPrintSink.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltPrintSink.java
new file mode 100644
index 0000000..044246b
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltPrintSink.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.task.TopologyContext;
+
+import java.util.Map;
+
+/**
+ * Implements a sink that prints the received data to {@code stdout}.
+ */
+public final class BoltPrintSink extends AbstractBoltSink {
+	private static final long serialVersionUID = -6650011223001009519L;
+
+	public BoltPrintSink(OutputFormatter formatter) {
+		super(formatter);
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepareSimple(final Map stormConf, final TopologyContext context) {
+		/* nothing to do */
+	}
+
+	@Override
+	public void writeExternal(final String line) {
+		System.out.println(line);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java
new file mode 100644
index 0000000..1126a2a
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Values;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Implements a Spout that reads data from a given local file.
+ */
+public class FileSpout extends AbstractLineSpout {
+	private static final long serialVersionUID = -6996907090003590436L;
+
+	public final static String INPUT_FILE_PATH = "input.path";
+
+	protected String path = null;
+	protected BufferedReader reader;
+
+	public FileSpout() {}
+
+	public FileSpout(final String path) {
+		this.path = path;
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+		super.open(conf, context, collector);
+
+		Object configuredPath = conf.get(INPUT_FILE_PATH);
+		if(configuredPath != null) {
+			this.path = (String)configuredPath;
+		}
+
+		try {
+			this.reader = new BufferedReader(new FileReader(this.path));
+		} catch (final FileNotFoundException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public void close() {
+		if (this.reader != null) {
+			try {
+				this.reader.close();
+			} catch (final IOException e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
+
+	@Override
+	public void nextTuple() {
+		String line;
+		try {
+			line = this.reader.readLine();
+			if (line != null) {
+				this.collector.emit(new Values(line));
+			}
+		} catch (final IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
new file mode 100644
index 0000000..75450c4
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Values;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.flink.storm.util.FiniteSpout;
+
+/**
+ * Implements a Spout that reads data from a given local file. The spout stops automatically
+ * when it reached the end of the file.
+ */
+public class FiniteFileSpout extends FileSpout implements FiniteSpout {
+	private static final long serialVersionUID = -1472978008607215864L;
+
+	private String line;
+	private boolean newLineRead;
+
+	public FiniteFileSpout() {}
+
+	public FiniteFileSpout(String path) {
+		super(path);
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+		super.open(conf, context, collector);
+		newLineRead = false;
+	}
+
+	@Override
+	public void nextTuple() {
+		this.collector.emit(new Values(line));
+		newLineRead = false;
+	}
+
+	/**
+	 * Can be called before nextTuple() any times including 0.
+	 */
+	@Override
+	public boolean reachedEnd() {
+		try {
+			readLine();
+		} catch (IOException e) {
+			throw new RuntimeException("Exception occured while reading file " + path);
+		}
+		return line == null;
+	}
+
+	private void readLine() throws IOException {
+		if (!newLineRead) {
+			line = reader.readLine();
+			newLineRead = true;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java
new file mode 100644
index 0000000..1490872
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import org.apache.flink.storm.util.FiniteSpout;
+
+
+/**
+ * Implements a Spout that reads String[] data stored in memory. The Spout stops automatically when it emitted all of
+ * the data.
+ */
+public class FiniteInMemorySpout extends InMemorySpout<String> implements FiniteSpout {
+	private static final long serialVersionUID = -4008858647468647019L;
+
+	public FiniteInMemorySpout(String[] source) {
+		super(source);
+	}
+
+	@Override
+	public boolean reachedEnd() {
+		return counter >= source.length;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/InMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/InMemorySpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/InMemorySpout.java
new file mode 100644
index 0000000..5e4c7ba
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/InMemorySpout.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.tuple.Values;
+
+/**
+ * Implements a Spout that reads data stored in memory.
+ */
+public class InMemorySpout<T> extends AbstractLineSpout {
+	private static final long serialVersionUID = -4008858647468647019L;
+
+	protected T[] source;
+	protected int counter = 0;
+
+	public InMemorySpout(T[] source) {
+		this.source = source;
+	}
+
+	@Override
+	public void nextTuple() {
+		if (this.counter < source.length) {
+			this.collector.emit(new Values(source[this.counter++]));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java
new file mode 100644
index 0000000..e696f9b
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+public interface OutputFormatter extends Serializable {
+
+	/**
+	 * Converts a Storm {@link Tuple} to a string. This method is used for formatting the output tuples before writing
+	 * them out to a file or to the console.
+	 * 
+	 * @param input
+	 *            The tuple to be formatted
+	 * @return The string result of the formatting
+	 */
+	public String format(Tuple input);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java
new file mode 100644
index 0000000..cef0081
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.tuple.Tuple;
+
+public class SimpleOutputFormatter implements OutputFormatter {
+	private static final long serialVersionUID = 6349573860144270338L;
+
+	/**
+	 * Converts a Storm {@link Tuple} with 1 field to a string by retrieving the value of that field. This method is
+	 * used for formatting raw outputs wrapped in tuples, before writing them out to a file or to the console.
+	 * 
+	 * @param input
+	 *            The tuple to be formatted
+	 * @return The string result of the formatting
+	 */
+	@Override
+	public String format(final Tuple input) {
+		if (input.getValues().size() != 1) {
+			throw new RuntimeException("The output is not raw");
+		}
+		return input.getValue(0).toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java
new file mode 100644
index 0000000..5d7ba53
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.tuple.Tuple;
+
+public class TupleOutputFormatter implements OutputFormatter {
+	private static final long serialVersionUID = -599665757723851761L;
+
+	@Override
+	public String format(final Tuple input) {
+		final StringBuilder stringBuilder = new StringBuilder();
+		stringBuilder.append("(");
+		for (final Object attribute : input.getValues()) {
+			stringBuilder.append(attribute);
+			stringBuilder.append(",");
+		}
+		stringBuilder.replace(stringBuilder.length() - 1, stringBuilder.length(), ")");
+		return stringBuilder.toString();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
new file mode 100644
index 0000000..aa3a075
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import backtype.storm.topology.IRichBolt;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.wordcount.operators.BoltTokenizer;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The tokenizer step is performed by a {@link IRichBolt Bolt}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Bolt within a Flink Streaming program.</li>
+ * </ul>
+ */
+public class BoltTokenizerWordCount {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		final DataStream<String> text = getTextDataStream(env);
+
+		final DataStream<Tuple2<String, Integer>> counts = text
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				// this is done by a bolt that is wrapped accordingly
+				.transform("BoltTokenizer",
+						TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
+						new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer()))
+				// group by the tuple field "0" and sum up tuple field "1"
+				.keyBy(0).sum(1);
+
+		// emit result
+		if (fileOutput) {
+			counts.writeAsText(outputPath);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("Streaming WordCount with bolt tokenizer");
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: BoltTokenizerWordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing BoltTokenizerWordCount example with built-in default data");
+			System.out.println("  Provide parameters to read input data from a file");
+			System.out.println("  Usage: BoltTokenizerWordCount <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		}
+
+		return env.fromElements(WordCountData.WORDS);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
new file mode 100644
index 0000000..f72acb3
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import backtype.storm.topology.IRichBolt;
+
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.wordcount.operators.BoltTokenizerByName;
+import org.apache.flink.storm.wordcount.operators.WordCountDataPojos;
+import org.apache.flink.storm.wordcount.operators.WordCountDataPojos.Sentence;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The tokenizer step is performed by a {@link IRichBolt Bolt}. In contrast to {@link BoltTokenizerWordCount}
+ * the tokenizer's input is a POJO type and the single field is accessed by name.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>how to access attributes by name within a Bolt for POJO type input streams
+ * </ul>
+ */
+public class BoltTokenizerWordCountPojo {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		final DataStream<Sentence> text = getTextDataStream(env);
+
+		final DataStream<Tuple2<String, Integer>> counts = text
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				// this is done by a bolt that is wrapped accordingly
+				.transform("BoltTokenizerPojo",
+						TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
+						new BoltWrapper<Sentence, Tuple2<String, Integer>>(new BoltTokenizerByName()))
+				// group by the tuple field "0" and sum up tuple field "1"
+				.keyBy(0).sum(1);
+
+		// emit result
+		if (fileOutput) {
+			counts.writeAsText(outputPath);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("Streaming WordCount with POJO bolt tokenizer");
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: BoltTokenizerWordCountPojo <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out
+					.println("Executing BoltTokenizerWordCountPojo example with built-in default data");
+			System.out.println("  Provide parameters to read input data from a file");
+			System.out.println("  Usage: BoltTokenizerWordCountPojo <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<Sentence> getTextDataStream(final StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			PojoTypeInfo<Sentence> sourceType = (PojoTypeInfo<Sentence>) TypeExtractor
+					.getForObject(new Sentence(""));
+			return env.createInput(new CsvInputFormat<Sentence>(new Path(
+					textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
+					CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
+					sourceType);
+		}
+
+		return env.fromElements(WordCountDataPojos.SENTENCES);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
new file mode 100644
index 0000000..7617e95
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.tuple.Fields;
+
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.wordcount.operators.BoltTokenizerByName;
+import org.apache.flink.storm.wordcount.operators.WordCountDataTuple;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The tokenizer step is performed by a {@link IRichBolt Bolt}. In contrast to {@link BoltTokenizerWordCount}
+ * the tokenizer's input is a {@link Tuple} type and the single field is accessed by name.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>how to access attributes by name within a Bolt for {@link Tuple} type input streams
+ * </ul>
+ */
+public class BoltTokenizerWordCountWithNames {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		final DataStream<Tuple1<String>> text = getTextDataStream(env);
+
+		final DataStream<Tuple2<String, Integer>> counts = text
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				// this is done by a Storm bolt that is wrapped accordingly
+				.transform(
+						"BoltTokenizerWithNames",
+						TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
+						new BoltWrapper<Tuple1<String>, Tuple2<String, Integer>>(
+								new BoltTokenizerByName(), new Fields("sentence")))
+				// group by the tuple field "0" and sum up tuple field "1"
+				.keyBy(0).sum(1);
+
+		// emit result
+		if (fileOutput) {
+			counts.writeAsText(outputPath);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("Streaming WordCount with schema bolt tokenizer");
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: BoltTokenizerWordCountWithNames <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing BoltTokenizerWordCountWithNames example with built-in default data");
+			System.out.println("  Provide parameters to read input data from a file");
+			System.out.println("  Usage: BoltTokenizerWordCountWithNames <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<Tuple1<String>> getTextDataStream(final StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			TupleTypeInfo<Tuple1<String>> sourceType = (TupleTypeInfo<Tuple1<String>>)TypeExtractor
+					.getForObject(new Tuple1<String>(""));
+			return env.createInput(new CsvInputFormat<Tuple1<String>>(new Path(
+					textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
+					CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
+					sourceType);
+		}
+
+		return env.fromElements(WordCountDataTuple.TUPLES);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java
new file mode 100644
index 0000000..bb451fe
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.wordcount.operators.WordCountFileSpout;
+import org.apache.flink.storm.wordcount.operators.WordCountInMemorySpout;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The used data source is a {@link IRichSpout Spout}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Spout within a Flink Streaming program.</li>
+ * </ul>
+ */
+public class SpoutSourceWordCount {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		final DataStream<String> text = getTextDataStream(env);
+
+		final DataStream<Tuple2<String, Integer>> counts =
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new Tokenizer())
+				// group by the tuple field "0" and sum up tuple field "1"
+				.keyBy(0).sum(1);
+
+		// emit result
+		if (fileOutput) {
+			counts.writeAsText(outputPath);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("Streaming WordCount with spout source");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a user-defined FlatMapFunction. The function
+	 * takes a line (String) and splits it into multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+	 */
+	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(final String value, final Collector<Tuple2<String, Integer>> out) throws Exception {
+			// normalize and split the line
+			final String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (final String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: SpoutSourceWordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing SpoutSourceWordCount example with built-in default data");
+			System.out.println("  Provide parameters to read input data from a file");
+			System.out.println("  Usage: SpoutSourceWordCount <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			final String[] tokens = textPath.split(":");
+			final String localFile = tokens[tokens.length - 1];
+			return env.addSource(
+					new SpoutWrapper<String>(new WordCountFileSpout(localFile),
+							new String[] { Utils.DEFAULT_STREAM_ID }, -1),
+							TypeExtractor.getForClass(String.class)).setParallelism(1);
+		}
+
+		return env.addSource(
+				new SpoutWrapper<String>(new WordCountInMemorySpout(),
+						new String[] { Utils.DEFAULT_STREAM_ID }, -1),
+						TypeExtractor.getForClass(String.class)).setParallelism(1);
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
new file mode 100644
index 0000000..18f49c1
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
+ * same way as to a Storm {@link LocalCluster}.
+ * <p/>
+ * This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology}
+ * via Flink command line clients (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCountLocal &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>run a regular Storm program locally on Flink</li>
+ * </ul>
+ */
+public class WordCountLocal {
+	public final static String topologyId = "Storm WordCount";
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!WordCountTopology.parseParameters(args)) {
+			return;
+		}
+
+		// build Topology the Storm way
+		final FlinkTopologyBuilder builder = WordCountTopology.buildTopology();
+
+		// execute program locally
+		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
+		cluster.submitTopology(topologyId, null, builder.createTopology());
+
+		Utils.sleep(10 * 1000);
+
+		// TODO kill does no do anything so far
+		cluster.killTopology(topologyId);
+		cluster.shutdown();
+	}
+
+}