You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:32 UTC

[07/15] flink git commit: [Storm Compatibility] Maven module restucturing and cleanup - removed storm-parent; renamed storm-core and storm-examples - updated internal Java package structure * renamed package "stormcompatibility" to "storm" *

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataPojos.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataPojos.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataPojos.java
deleted file mode 100644
index f965a28..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataPojos.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount.stormoperators;
-
-import java.io.Serializable;
-
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-
-public class WordCountDataPojos {
-	public static Sentence[] SENTENCES;
-
-	static {
-		SENTENCES = new Sentence[WordCountData.WORDS.length];
-		for (int i = 0; i < SENTENCES.length; ++i) {
-			SENTENCES[i] = new Sentence(WordCountData.WORDS[i]);
-		}
-	}
-
-	public static class Sentence implements Serializable {
-		private static final long serialVersionUID = -7336372859203407522L;
-
-		private String sentence;
-
-		public Sentence() {
-		}
-
-		public Sentence(String sentence) {
-			this.sentence = sentence;
-		}
-
-		public String getSentence() {
-			return sentence;
-		}
-
-		public void setSentence(String sentence) {
-			this.sentence = sentence;
-		}
-
-		@Override
-		public String toString() {
-			return "(" + this.sentence + ")";
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataTuple.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataTuple.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataTuple.java
deleted file mode 100644
index 732f0ae..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataTuple.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount.stormoperators;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-
-@SuppressWarnings("unchecked")
-public class WordCountDataTuple {
-	public static Tuple1<String>[] TUPLES;
-
-	static {
-		TUPLES = new Tuple1[WordCountData.WORDS.length];
-		for (int i = 0; i < TUPLES.length; ++i) {
-			TUPLES[i] = new Tuple1<String>(WordCountData.WORDS[i]);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/StormTestBase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/StormTestBase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/StormTestBase.java
deleted file mode 100644
index dd6d0d9..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/StormTestBase.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.junit.Test;
-
-import static org.junit.Assert.fail;
-
-/**
- * Base class for Storm tests.
- */
-public abstract class StormTestBase extends AbstractTestBase {
-	
-	public static final int DEFAULT_PARALLELISM = 4;
-	
-	public StormTestBase() {
-		this(new Configuration());
-	}
-	
-	public StormTestBase(Configuration config) {
-		super(config, StreamingMode.STREAMING);
-		setTaskManagerNumSlots(DEFAULT_PARALLELISM);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Methods to create the test program and for pre- and post- test work
-	// ------------------------------------------------------------------------
-
-	protected abstract void testProgram() throws Exception;
-
-	protected void preSubmit() throws Exception {}
-
-	protected void postSubmit() throws Exception {}
-
-	// ------------------------------------------------------------------------
-	//  Test entry point
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testJob() throws Exception {
-		try {
-			// pre-submit
-			try {
-				preSubmit();
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				fail("Pre-submit work caused an error: " + e.getMessage());
-			}
-
-			// prepare the test environment
-			startCluster();
-
-			// we need to initialize the stream test environment, and the storm local cluster
-			TestStreamEnvironment.setAsContext(this.executor, DEFAULT_PARALLELISM);
-			
-			FlinkLocalCluster.initialize(new FlinkLocalCluster.LocalClusterFactory() {
-				@Override
-				public FlinkLocalCluster createLocalCluster() {
-					return new FlinkLocalCluster(executor);
-				}
-			});
-
-			// call the test program
-			try {
-				testProgram();
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				fail("Error while calling the test program: " + e.getMessage());
-			}
-
-			// post-submit
-			try {
-				postSubmit();
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				fail("Post-submit work caused an error: " + e.getMessage());
-			}
-		}
-		finally {
-			// reset the FlinkLocalCluster to its default behavior
-			FlinkLocalCluster.initialize(new FlinkLocalCluster.DefaultLocalClusterFactory());
-			
-			// reset the StreamExecutionEnvironment to its default behavior
-			TestStreamEnvironment.unsetAsContext();
-			
-			// clean up all resources
-			stopCluster();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
deleted file mode 100644
index a858f36..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.exclamation;
-
-import org.apache.flink.stormcompatibility.api.StormTestBase;
-import org.apache.flink.stormcompatibility.excamation.ExclamationWithStormBolt;
-import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class ExclamationWithStormBoltITCase extends StormTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-	protected String exclamationNum;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
-		this.resultPath = this.getTempDirPath("result");
-		this.exclamationNum = "3";
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		ExclamationWithStormBolt.main(new String[]{this.textPath, this.resultPath, this.exclamationNum});
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
deleted file mode 100644
index 2a8ac24..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.exclamation;
-
-import org.apache.flink.stormcompatibility.api.StormTestBase;
-import org.apache.flink.stormcompatibility.excamation.ExclamationWithStormSpout;
-import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class ExclamationWithStormSpoutITCase extends StormTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
-		this.resultPath = this.getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		ExclamationWithStormSpout.main(new String[]{this.textPath, this.resultPath});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
deleted file mode 100644
index a19f3af..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.exclamation;
-
-import org.apache.flink.stormcompatibility.api.StormTestBase;
-import org.apache.flink.stormcompatibility.excamation.StormExclamationLocal;
-import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class StormExclamationLocalITCase extends StormTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-	protected String exclamationNum;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
-		this.resultPath = this.getTempDirPath("result");
-		this.exclamationNum = "3";
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		StormExclamationLocal.main(new String[]{this.textPath, this.resultPath, this.exclamationNum});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/util/ExclamationData.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/util/ExclamationData.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/util/ExclamationData.java
deleted file mode 100644
index 8b823b5..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/util/ExclamationData.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.exclamation.util;
-
-public class ExclamationData {
-
-	public static final String TEXT_WITH_EXCLAMATIONS =
-			"Goethe - Faust: Der Tragoedie erster Teil!!!!!!\n"
-					+ "Prolog im Himmel.!!!!!!\n"
-					+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei!!!!!!\n"
-					+ "Erzengel treten vor.!!!!!!\n"
-					+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,!!!!!!\n"
-					+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick!!!!!!\n"
-					+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich!!!!!!\n"
-					+ "hohen Werke Sind herrlich wie am ersten Tag.!!!!!!\n"
-					+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde!!!!!!\n"
-					+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es!!!!!!\n"
-					+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und!!!!!!\n"
-					+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.!!!!!!\n"
-					+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land!!!!!!\n"
-					+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.!!!!!!\n"
-					+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch!!!!!!\n"
-					+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.!!!!!!\n"
-					+ "ZU DREI: Der Anblick gibt den Engeln Staerke, Da keiner dich ergruenden!!!!!!\n"
-					+ "mag, Und alle deine hohen Werke Sind herrlich wie am ersten Tag.!!!!!!\n"
-					+ "MEPHISTOPHELES: Da du, o Herr, dich einmal wieder nahst Und fragst, wie!!!!!!\n"
-					+ "alles sich bei uns befinde, Und du mich sonst gewoehnlich gerne sahst, So!!!!!!\n"
-					+ "siehst du mich auch unter dem Gesinde. Verzeih, ich kann nicht hohe Worte!!!!!!\n"
-					+ "machen, Und wenn mich auch der ganze Kreis verhoehnt; Mein Pathos braechte!!!!!!\n"
-					+ "dich gewiss zum Lachen, Haettst du dir nicht das Lachen abgewoehnt. Von!!!!!!\n"
-					+ "Sonn' und Welten weiss ich nichts zu sagen, Ich sehe nur, wie sich die!!!!!!\n"
-					+ "Menschen plagen. Der kleine Gott der Welt bleibt stets von gleichem!!!!!!\n"
-					+ "Schlag, Und ist so wunderlich als wie am ersten Tag. Ein wenig besser!!!!!!\n"
-					+ "wuerd er leben, Haettst du ihm nicht den Schein des Himmelslichts gegeben;!!!!!!\n"
-					+ "Er nennt's Vernunft und braucht's allein, Nur tierischer als jedes Tier!!!!!!\n"
-					+ "zu sein. Er scheint mir, mit Verlaub von euer Gnaden, Wie eine der!!!!!!\n"
-					+ "langbeinigen Zikaden, Die immer fliegt und fliegend springt Und gleich im!!!!!!\n"
-					+ "Gras ihr altes Liedchen singt; Und laeg er nur noch immer in dem Grase! In!!!!!!\n"
-					+ "jeden Quark begraebt er seine Nase.!!!!!!\n"
-					+ "DER HERR: Hast du mir weiter nichts zu sagen? Kommst du nur immer!!!!!!\n"
-					+ "anzuklagen? Ist auf der Erde ewig dir nichts recht?!!!!!!\n"
-					+ "MEPHISTOPHELES: Nein Herr! ich find es dort, wie immer, herzlich!!!!!!\n"
-					+ "schlecht. Die Menschen dauern mich in ihren Jammertagen, Ich mag sogar!!!!!!\n"
-					+ "die armen selbst nicht plagen.!!!!!!\n" + "DER HERR: Kennst du den Faust?!!!!!!\n"
-					+ "MEPHISTOPHELES: Den Doktor?!!!!!!\n"
-					+ "DER HERR: Meinen Knecht!!!!!!!\n"
-					+ "MEPHISTOPHELES: Fuerwahr! er dient Euch auf besondre Weise. Nicht irdisch!!!!!!\n"
-					+ "ist des Toren Trank noch Speise. Ihn treibt die Gaerung in die Ferne, Er!!!!!!\n"
-					+ "ist sich seiner Tollheit halb bewusst; Vom Himmel fordert er die schoensten!!!!!!\n"
-					+ "Sterne Und von der Erde jede hoechste Lust, Und alle Naeh und alle Ferne!!!!!!\n"
-					+ "Befriedigt nicht die tiefbewegte Brust.!!!!!!\n"
-					+ "DER HERR: Wenn er mir auch nur verworren dient, So werd ich ihn bald in!!!!!!\n"
-					+ "die Klarheit fuehren. Weiss doch der Gaertner, wenn das Baeumchen gruent, Das!!!!!!\n"
-					+ "Bluet und Frucht die kuenft'gen Jahre zieren.!!!!!!\n"
-					+ "MEPHISTOPHELES: Was wettet Ihr? den sollt Ihr noch verlieren! Wenn Ihr!!!!!!\n"
-					+ "mir die Erlaubnis gebt, Ihn meine Strasse sacht zu fuehren.!!!!!!\n"
-					+ "DER HERR: Solang er auf der Erde lebt, So lange sei dir's nicht verboten,!!!!!!\n"
-					+ "Es irrt der Mensch so lang er strebt.!!!!!!\n"
-					+ "MEPHISTOPHELES: Da dank ich Euch; denn mit den Toten Hab ich mich niemals!!!!!!\n"
-					+ "gern befangen. Am meisten lieb ich mir die vollen, frischen Wangen. Fuer!!!!!!\n"
-					+ "einem Leichnam bin ich nicht zu Haus; Mir geht es wie der Katze mit der Maus.!!!!!!\n"
-					+ "DER HERR: Nun gut, es sei dir ueberlassen! Zieh diesen Geist von seinem!!!!!!\n"
-					+ "Urquell ab, Und fuehr ihn, kannst du ihn erfassen, Auf deinem Wege mit!!!!!!\n"
-					+ "herab, Und steh beschaemt, wenn du bekennen musst: Ein guter Mensch, in!!!!!!\n"
-					+ "seinem dunklen Drange, Ist sich des rechten Weges wohl bewusst.!!!!!!\n"
-					+ "MEPHISTOPHELES: Schon gut! nur dauert es nicht lange. Mir ist fuer meine!!!!!!\n"
-					+ "Wette gar nicht bange. Wenn ich zu meinem Zweck gelange, Erlaubt Ihr mir!!!!!!\n"
-					+ "Triumph aus voller Brust. Staub soll er fressen, und mit Lust, Wie meine!!!!!!\n"
-					+ "Muhme, die beruehmte Schlange.!!!!!!\n"
-					+ "DER HERR: Du darfst auch da nur frei erscheinen; Ich habe deinesgleichen!!!!!!\n"
-					+ "nie gehasst. Von allen Geistern, die verneinen, ist mir der Schalk am!!!!!!\n"
-					+ "wenigsten zur Last. Des Menschen Taetigkeit kann allzu leicht erschlaffen,!!!!!!\n"
-					+ "er liebt sich bald die unbedingte Ruh; Drum geb ich gern ihm den Gesellen!!!!!!\n"
-					+ "zu, Der reizt und wirkt und muss als Teufel schaffen. Doch ihr, die echten!!!!!!\n"
-					+ "Goettersoehne, Erfreut euch der lebendig reichen Schoene! Das Werdende, das!!!!!!\n"
-					+ "ewig wirkt und lebt, Umfass euch mit der Liebe holden Schranken, Und was!!!!!!\n"
-					+ "in schwankender Erscheinung schwebt, Befestigt mit dauernden Gedanken!!!!!!!\n"
-					+ "(Der Himmel schliesst, die Erzengel verteilen sich.)!!!!!!\n"
-					+ "MEPHISTOPHELES (allein): Von Zeit zu Zeit seh ich den Alten gern, Und!!!!!!\n"
-					+ "huete mich, mit ihm zu brechen. Es ist gar huebsch von einem grossen Herrn,!!!!!!\n"
-					+ "So menschlich mit dem Teufel selbst zu sprechen.!!!!!!";
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/BoltSplitITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/BoltSplitITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/BoltSplitITCase.java
deleted file mode 100644
index 305245b..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/BoltSplitITCase.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.split;
-
-import org.junit.Test;
-
-public class BoltSplitITCase {
-
-	@Test
-	public void testTopology() throws Exception {
-		StormSplitStreamBoltLocal.main(new String[] { "0", "/dev/null" });
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBolt.java
deleted file mode 100644
index c40e054..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBolt.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.split;
-
-import java.util.Map;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-public class SplitBolt extends BaseRichBolt {
-	private static final long serialVersionUID = -6627606934204267173L;
-
-	public static final String EVEN_STREAM = "even";
-	public static final String ODD_STREAM = "odd";
-
-	private OutputCollector collector;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-		this.collector = collector;
-
-	}
-
-	@Override
-	public void execute(Tuple input) {
-		if (input.getInteger(0) % 2 == 0) {
-			this.collector.emit(EVEN_STREAM, new Values(input.getInteger(0)));
-		} else {
-			this.collector.emit(ODD_STREAM, new Values(input.getInteger(0)));
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		Fields schema = new Fields("number");
-		declarer.declareStream(EVEN_STREAM, schema);
-		declarer.declareStream(ODD_STREAM, schema);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
deleted file mode 100644
index 5f637d3..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.split;
-
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.stormcompatibility.split.stormoperators.RandomSpout;
-import org.apache.flink.stormcompatibility.split.stormoperators.VerifyAndEnrichBolt;
-import org.apache.flink.stormcompatibility.util.OutputFormatter;
-import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
-import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
-import org.apache.flink.stormcompatibility.util.TupleOutputFormatter;
-
-public class SplitBoltTopology {
-	public final static String spoutId = "randomSource";
-	public final static String boltId = "splitBolt";
-	public final static String evenVerifierId = "evenVerifier";
-	public final static String oddVerifierId = "oddVerifier";
-	public final static String sinkId = "sink";
-	private final static OutputFormatter formatter = new TupleOutputFormatter();
-
-	public static FlinkTopologyBuilder buildTopology() {
-		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
-
-		builder.setSpout(spoutId, new RandomSpout(false, seed));
-		builder.setBolt(boltId, new SplitBolt()).shuffleGrouping(spoutId);
-		builder.setBolt(evenVerifierId, new VerifyAndEnrichBolt(true)).shuffleGrouping(boltId,
-				SplitBolt.EVEN_STREAM);
-		builder.setBolt(oddVerifierId, new VerifyAndEnrichBolt(false)).shuffleGrouping(boltId,
-				SplitBolt.ODD_STREAM);
-
-		// emit result
-		if (outputPath != null) {
-			// read the text file from given input path
-			final String[] tokens = outputPath.split(":");
-			final String outputFile = tokens[tokens.length - 1];
-			builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter))
-					.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
-		} else {
-			builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4)
-					.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
-		}
-
-		return builder;
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static long seed = System.currentTimeMillis();
-	private static String outputPath = null;
-
-	static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			if (args.length == 2) {
-				seed = Long.parseLong(args[0]);
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: SplitBoltTopology <seed> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing SplitBoltTopology example with random data");
-			System.out.println("  Usage: SplitBoltTopology <seed> <result path>");
-		}
-
-		return true;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitSpoutTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitSpoutTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitSpoutTopology.java
deleted file mode 100644
index 613fd10..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitSpoutTopology.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.split;
-
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.stormcompatibility.split.stormoperators.RandomSpout;
-import org.apache.flink.stormcompatibility.split.stormoperators.VerifyAndEnrichBolt;
-import org.apache.flink.stormcompatibility.util.OutputFormatter;
-import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
-import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
-import org.apache.flink.stormcompatibility.util.TupleOutputFormatter;
-
-public class SplitSpoutTopology {
-	public final static String spoutId = "randomSplitSource";
-	public final static String evenVerifierId = "evenVerifier";
-	public final static String oddVerifierId = "oddVerifier";
-	public final static String sinkId = "sink";
-	private final static OutputFormatter formatter = new TupleOutputFormatter();
-
-	public static FlinkTopologyBuilder buildTopology() {
-		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
-
-		builder.setSpout(spoutId, new RandomSpout(true, seed));
-		builder.setBolt(evenVerifierId, new VerifyAndEnrichBolt(true)).shuffleGrouping(spoutId,
-				RandomSpout.EVEN_STREAM);
-		builder.setBolt(oddVerifierId, new VerifyAndEnrichBolt(false)).shuffleGrouping(spoutId,
-				RandomSpout.ODD_STREAM);
-
-		// emit result
-		if (outputPath != null) {
-			// read the text file from given input path
-			final String[] tokens = outputPath.split(":");
-			final String outputFile = tokens[tokens.length - 1];
-			builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter))
-			.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
-		} else {
-			builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4)
-			.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
-		}
-
-		return builder;
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static long seed = System.currentTimeMillis();
-	private static String outputPath = null;
-
-	static boolean parseParameters(final String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			if (args.length == 2) {
-				seed = Long.parseLong(args[0]);
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: SplitSpoutTopology <seed> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing SplitSpoutTopology example with random data");
-			System.out.println("  Usage: SplitSpoutTopology <seed> <result path>");
-		}
-
-		return true;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SpoutSplitITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SpoutSplitITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SpoutSplitITCase.java
deleted file mode 100644
index f30e160..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SpoutSplitITCase.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.split;
-
-import org.junit.Test;
-
-public class SpoutSplitITCase {
-
-	@Test
-	public void testTopology() throws Exception {
-		StormSplitStreamSpoutLocal.main(new String[] { "0", "/dev/null" });
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamBoltLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamBoltLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamBoltLocal.java
deleted file mode 100644
index 028f6d1..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamBoltLocal.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.split;
-
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-import backtype.storm.utils.Utils;
-
-public class StormSplitStreamBoltLocal {
-	public final static String topologyId = "Bolt split stream example";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!SplitBoltTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final FlinkTopologyBuilder builder = SplitBoltTopology.buildTopology();
-
-		// execute program locally
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		cluster.submitTopology(topologyId, null, builder.createTopology());
-
-		Utils.sleep(5 * 1000);
-
-		// TODO kill does no do anything so far
-		cluster.killTopology(topologyId);
-		cluster.shutdown();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamSpoutLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamSpoutLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamSpoutLocal.java
deleted file mode 100644
index cc5acd9..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamSpoutLocal.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.split;
-
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-import backtype.storm.utils.Utils;
-
-public class StormSplitStreamSpoutLocal {
-	public final static String topologyId = "Spout split stream example";
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!SplitSpoutTopology.parseParameters(args)) {
-			return;
-		}
-
-		// build Topology the Storm way
-		final FlinkTopologyBuilder builder = SplitSpoutTopology.buildTopology();
-
-		// execute program locally
-		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
-		cluster.submitTopology(topologyId, null, builder.createTopology());
-
-		Utils.sleep(5 * 1000);
-
-		// TODO kill does no do anything so far
-		cluster.killTopology(topologyId);
-		cluster.shutdown();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
deleted file mode 100644
index c9516ff..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import org.apache.flink.stormcompatibility.api.StormTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class BoltTokenizerWordCountITCase extends StormTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
-		this.resultPath = this.getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		BoltTokenizerWordCount.main(new String[]{this.textPath, this.resultPath});
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
deleted file mode 100644
index 351014e..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import org.apache.flink.stormcompatibility.api.StormTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class BoltTokenizerWordCountPojoITCase extends StormTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
-		this.resultPath = this.getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		BoltTokenizerWordCountPojo.main(new String[]{this.textPath, this.resultPath});
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
deleted file mode 100644
index c2ed088..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import org.apache.flink.stormcompatibility.api.StormTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class BoltTokenizerWordCountWithNamesITCase extends StormTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
-		this.resultPath = this.getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		BoltTokenizerWordCountWithNames.main(new String[]{this.textPath, this.resultPath});
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
deleted file mode 100644
index 93361c5..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import org.apache.flink.stormcompatibility.api.StormTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class SpoutSourceWordCountITCase extends StormTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
-		this.resultPath = this.getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		SpoutSourceWordCount.main(new String[]{this.textPath, this.resultPath});
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
deleted file mode 100644
index 6b51cbd..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import org.apache.flink.stormcompatibility.api.StormTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class StormWordCountLocalITCase extends StormTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
-		this.resultPath = this.getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		StormWordCountLocal.main(new String[]{this.textPath, this.resultPath});
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
deleted file mode 100644
index a9e9884..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import org.apache.flink.stormcompatibility.api.StormTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class StormWordCountLocalNamedITCase extends StormTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
-		this.resultPath = this.getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		StormWordCountNamedLocal.main(new String[] { this.textPath, this.resultPath });
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/resources/log4j-test.properties b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/resources/log4j-test.properties
deleted file mode 100644
index 0b686e5..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/resources/log4j.properties b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/resources/log4j.properties
deleted file mode 100644
index ed2bbcb..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=OFF, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target = System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/resources/logback-test.xml b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/resources/logback-test.xml
deleted file mode 100644
index 4f56748..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/pom.xml b/flink-contrib/flink-storm-compatibility/pom.xml
deleted file mode 100644
index 803336f..0000000
--- a/flink-contrib/flink-storm-compatibility/pom.xml
+++ /dev/null
@@ -1,40 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-contrib-parent</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-storm-compatibility-parent</artifactId>
-	<name>flink-storm-compatibility</name>
-	<packaging>pom</packaging>
-
-	<modules>
-		<module>flink-storm-compatibility-core</module>
-		<module>flink-storm-compatibility-examples</module>
-	</modules>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/README.md b/flink-contrib/flink-storm-examples/README.md
new file mode 100644
index 0000000..c3247f6
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/README.md
@@ -0,0 +1,20 @@
+# flink-storm-examples
+
+This module contains multiple versions of a simple Word-Count example to illustrate the usage of the compatibility layer:
+* the usage of spouts and bolts within a regular Flink streaming program (ie, embedded mode)
+   1. `SpoutSourceWordCount` uses a spout as data source within a Flink streaming program
+   2. `BoltTokenizeerWordCount` uses a bolt to split sentences into words within a Flink streaming program
+      * `BoltTokenizeerWordCountWithNames` used `Tuple` input type and accesses attributes by field names (rather than index)
+      * `BoltTokenizeerWordCountPOJO` used POJO input type and accesses attributes by field names (rather than index)
+
+* how to submit a whole Storm topology to Flink
+   3. `WordCountTopology` plugs a Storm topology together
+      * `StormWordCountLocal` submits the topology to a local Flink cluster (similiar to a `LocalCluster` in Storm)
+        (`WordCountLocalByName` accesses attributes by field names rather than index)
+      * `WordCountRemoteByClient` submits the topology to a remote Flink cluster (simliar to the usage of `NimbusClient` in Storm)
+      * `WordCountRemoteBySubmitter` submits the topology to a remote Flink cluster (simliar to the usage of `StormSubmitter` in Storm)
+
+Additionally, this module package the three example Word-Count programs as jar files to be submitted to a Flink cluster via `bin/flink run example.jar`.
+(Valid jars are `WordCount-SpoutSource.jar`, `WordCount-BoltTokenizer.jar`, and `WordCount-StormTopology.jar`)
+
+The package `org.apache.flink.storm.wordcount.operators` contains original spouts and bolts that can be used unmodified within Storm or Flink.

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/pom.xml b/flink-contrib/flink-storm-examples/pom.xml
new file mode 100644
index 0000000..de6c0cb
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/pom.xml
@@ -0,0 +1,364 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-contrib-parent</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-storm-examples</artifactId>
+	<name>flink-storm-examples</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-storm</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java-examples</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- get default data from flink-java-examples package -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<version>2.9</version><!--$NO-MVN-MAN-VER$-->
+				<executions>
+					<execution>
+						<id>unpack</id>
+						<phase>prepare-package</phase>
+						<goals>
+							<goal>unpack</goal>
+						</goals>
+						<configuration>
+							<artifactItems>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-java-examples</artifactId>
+									<version>${project.version}</version>
+									<type>jar</type>
+									<overWrite>false</overWrite>
+									<outputDirectory>${project.build.directory}/classes</outputDirectory>
+									<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-storm</artifactId>
+									<version>${project.version}</version>
+									<type>jar</type>
+									<overWrite>false</overWrite>
+									<outputDirectory>${project.build.directory}/classes</outputDirectory>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.storm</groupId>
+									<artifactId>storm-core</artifactId>
+									<version>0.9.4</version>
+									<type>jar</type>
+									<overWrite>false</overWrite>
+									<outputDirectory>${project.build.directory}/classes</outputDirectory>
+									<!-- need to exclude to be able to run
+									       * StormWordCountRemoteByClient and
+									       * StormWordCountRemoteBySubmitter
+									     within Eclipse -->
+									<excludes>defaults.yaml</excludes>
+								</artifactItem>
+								<artifactItem>
+									<groupId>com.googlecode.json-simple</groupId>
+									<artifactId>json-simple</artifactId>
+									<version>1.1</version>
+									<type>jar</type>
+									<overWrite>false</overWrite>
+									<outputDirectory>${project.build.directory}/classes</outputDirectory>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.yaml</groupId>
+									<artifactId>snakeyaml</artifactId>
+									<version>1.11</version>
+									<type>jar</type>
+									<overWrite>false</overWrite>
+									<outputDirectory>${project.build.directory}/classes</outputDirectory>
+								</artifactItem>
+							</artifactItems>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- self-contained jars for each example -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+
+				<executions>
+
+					<!-- WordCount Spout source-->
+					<!-- example for embedded spout - for whole topologies see "WordCount Storm topology" example below -->
+					<execution>
+						<id>WordCount-SpoutSource</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<finalName>WordCount</finalName>
+							<classifier>SpoutSource</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.storm.wordcount.SpoutSourceWordCount</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<!-- from storm-core -->
+								<include>backtype/storm/topology/*.class</include>
+								<include>backtype/storm/spout/*.class</include>
+								<include>backtype/storm/task/*.class</include>
+								<include>backtype/storm/tuple/*.class</include>
+								<include>backtype/storm/generated/*.class</include>
+								<include>backtype/storm/metric/**/*.class</include>
+								<include>org/apache/thrift7/**/*.class</include>
+								<!-- Storm's recursive dependencies -->
+								<include>org/json/simple/**/*.class</include>
+								<!-- compatibility layer -->
+								<include>org/apache/flink/storm/api/*.class</include>
+								<include>org/apache/flink/storm/util/*.class</include>
+								<include>org/apache/flink/storm/wrappers/*.class</include>
+								<!-- Word Count -->
+								<include>org/apache/flink/storm/wordcount/SpoutSourceWordCount.class</include>
+								<include>org/apache/flink/storm/wordcount/SpoutSourceWordCount$*.class</include>
+								<include>org/apache/flink/storm/wordcount/operators/WordCountFileSpout.class</include>
+								<include>org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.class</include>
+								<include>org/apache/flink/storm/util/AbstractLineSpout.class</include>
+								<include>org/apache/flink/storm/util/FileSpout.class</include>
+								<include>org/apache/flink/storm/util/InMemorySpout.class</include>
+								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WordCount Bolt tokenizer-->
+					<!-- example for embedded bolt - for whole topologies see "WordCount Storm topology" example below -->
+					<execution>
+						<id>WordCount-BoltTokenizer</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<finalName>WordCount</finalName>
+							<classifier>BoltTokenizer</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.storm.wordcount.BoltTokenizerWordCount</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<!-- from storm-core -->
+								<include>backtype/storm/topology/*.class</include>
+								<include>backtype/storm/spout/*.class</include>
+								<include>backtype/storm/task/*.class</include>
+								<include>backtype/storm/tuple/*.class</include>
+								<include>backtype/storm/generated/*.class</include>
+								<include>backtype/storm/metric/**/*.class</include>
+								<include>org/apache/thrift7/**/*.class</include>
+								<!-- Storm's recursive dependencies -->
+								<include>org/json/simple/**/*.class</include>
+								<!-- compatibility layer -->
+								<include>org/apache/flink/storm/api/*.class</include>
+								<include>org/apache/flink/storm/util/*.class</include>
+								<include>org/apache/flink/storm/wrappers/*.class</include>
+								<!-- Word Count -->
+								<include>org/apache/flink/storm/wordcount/BoltTokenizerWordCount.class</include>
+								<include>org/apache/flink/storm/wordcount/operators/BoltTokenizer.class</include>
+								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WordCount Storm topology-->
+					<!-- Example for whole topologies (ie, if FlinkTopologyBuilder is used) -->
+					<!-- We cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar.
+					     However, we excluded 'defaults.yaml' in dependency-plugin to get clean Eclipse environment.
+					     Thus, 'defaults.yaml' is not available for maven-jar-plugin.
+					     Nevertheless, we register an empty jar with corresponding name, such that the final jar can be installed to local maven repository.
+					     We use maven-shade-plugin to build the actual jar (which will replace the empty jar). -->
+					<execution>
+						<id>WordCount-StormTopology</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<finalName>WordCount</finalName>
+							<classifier>StormTopology</classifier>
+						</configuration>
+					</execution>
+
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- WordCount Storm topology-->
+			<!-- Cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar -->
+			<!-- Build StormTopolgy jar to overwrite empty jar created with maven-jar-plugin. -->
+			<plugin>
+				<artifactId>maven-shade-plugin</artifactId>
+				<groupId>org.apache.maven.plugins</groupId>
+				<executions>
+					<execution>
+						<id>WordCount-StormTopology</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>WordCount-StormTopology</finalName>
+
+							<artifactSet>
+								<includes>
+									<include>org.apache.storm:storm-core</include>
+									<!-- Storm's recursive dependencies -->
+									<include>org.yaml:snakeyaml</include>
+									<include>com.googlecode.json-simple:json-simple</include>
+									<include>org.apache.flink:flink-storm</include>
+									<include>org.apache.flink:flink-storm-examples</include>
+								</includes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<artifact>org.apache.storm:storm-core</artifact>
+									<includes>
+										<include>defaults.yaml</include>
+										<include>backtype/storm/*.class</include>
+										<include>backtype/storm/topology/*.class</include>
+										<include>backtype/storm/spout/*.class</include>
+										<include>backtype/storm/task/*.class</include>
+										<include>backtype/storm/tuple/*.class</include>
+										<include>backtype/storm/generated/*.class</include>
+										<include>backtype/storm/metric/**/*.class</include>
+										<include>backtype/storm/utils/*.class</include>
+										<include>backtype/storm/serialization/*.class</include>
+										<include>org/apache/storm/curator/**/*.class</include>
+										<include>org/apache/thrift7/**/*.class</include>
+										<!-- Storm's recursive dependencies -->
+										<include>org/json/simple/**/*.class</include>
+										<include>org/yaml/snakeyaml/**/*.class</include>
+									</includes>
+								</filter>
+								<filter>
+									<artifact>org.apache.flink:flink-storm-examples</artifact>
+									<includes>
+										<include>org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.class</include>
+										<include>org/apache/flink/storm/wordcount/WordCountTopology.class</include>
+										<include>org/apache/flink/storm/wordcount/operators/*.class</include>
+										<include>org/apache/flink/storm/util/*.class</include>
+										<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+									</includes>
+								</filter>
+								<filter>
+									<artifact>org.apache.flink:flink-storm</artifact>
+									<includes>
+										<include>org/apache/flink/storm/api/*.class</include>
+										<include>org/apache/flink/storm/util/*.class</include>
+										<include>org/apache/flink/storm/wrappers/*.class</include>
+									</includes>
+								</filter>
+							</filters>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-dependency-plugin</artifactId>
+										<versionRange>[2.9,)</versionRange>
+										<goals>
+											<goal>unpack</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+
+	</build>
+
+</project>