You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/15 11:33:04 UTC

[14/27] flink git commit: [storm-compat] adapted layer to new streaming API

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
index 48f680a..6d2f196 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
@@ -45,7 +45,7 @@ public class StormSpoutWrapperTest extends AbstractTest {
 		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
 
 		spoutWrapper.cancel();
-		final TestCollector collector = new TestCollector();
+		final TestContext collector = new TestContext();
 		spoutWrapper.run(collector);
 
 		Assert.assertEquals(new LinkedList<Tuple1<Integer>>(), collector.result);

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestCollector.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestCollector.java
deleted file mode 100644
index 2d85597..0000000
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestCollector.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.util.Collector;
-
-import java.util.LinkedList;
-
-class TestCollector implements Collector<Tuple1<Integer>> {
-	public LinkedList<Tuple1<Integer>> result = new LinkedList<Tuple1<Integer>>();
-
-	public TestCollector() {
-	}
-
-	@Override
-	public void collect(final Tuple1<Integer> record) {
-		this.result.add(record.copy());
-	}
-
-	@Override
-	public void close() {/* nothing to to */}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
new file mode 100644
index 0000000..8885a1b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import java.util.LinkedList;
+
+class TestContext implements SourceContext<Tuple1<Integer>> {
+	public LinkedList<Tuple1<Integer>> result = new LinkedList<Tuple1<Integer>>();
+
+	public TestContext() {
+	}
+
+	@Override
+	public void collect(final Tuple1<Integer> record) {
+		this.result.add(record.copy());
+	}
+
+	@Override
+	public Object getCheckpointLock() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
index cae50b7..79c7125 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flink.stormcompatibility.singlejoin;
 
 import backtype.storm.tuple.Fields;
@@ -27,10 +44,10 @@ public class SingleJoinTopology {
 		builder.setSpout(spoutId2, new AgeSpout(new Fields("id", "age")));
 
 		builder.setBolt(boltId, new SingleJoinBolt(new Fields("gender", "age")))
-				.fieldsGrouping(spoutId1, new Fields("id"))
-				.fieldsGrouping(spoutId2, new Fields("id"));
-				//.shuffleGrouping(spoutId1)
-				//.shuffleGrouping(spoutId2);
+		.fieldsGrouping(spoutId1, new Fields("id"))
+		.fieldsGrouping(spoutId2, new Fields("id"));
+		//.shuffleGrouping(spoutId1)
+		//.shuffleGrouping(spoutId2);
 
 		// emit result
 		if (fileInputOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
index 8d66d29..d70914a 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/StormSingleJoinLocal.java
@@ -1,9 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flink.stormcompatibility.singlejoin;
 
 import backtype.storm.utils.Utils;
 import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
 import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.stormcompatibility.wordcount.WordCountTopology;
 
 public class StormSingleJoinLocal {
 	public final static String topologyId = "Streaming SingleJoin";

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
index 4edc133..49761c3 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/AgeSpout.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
 
 import backtype.storm.topology.OutputFieldsDeclarer;

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
index ac25917..d507998 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/GenderSpout.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
 
 import backtype.storm.topology.OutputFieldsDeclarer;

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
index 3df1618..cd53140 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/stormoperators/SingleJoinBolt.java
@@ -1,6 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flink.stormcompatibility.singlejoin.stormoperators;
 
-import backtype.storm.Config;
 import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
index 939380c..b121744 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/AbstractStormBoltSink.java
@@ -34,9 +34,9 @@ public abstract class AbstractStormBoltSink implements IRichBolt {
 
 	private StringBuilder lineBuilder;
 	private String prefix = "";
-	private OutputFormatter formatter;
+	private final OutputFormatter formatter;
 
-	public AbstractStormBoltSink(OutputFormatter formatter) {
+	public AbstractStormBoltSink(final OutputFormatter formatter) {
 		this.formatter = formatter;
 	}
 
@@ -56,14 +56,7 @@ public abstract class AbstractStormBoltSink implements IRichBolt {
 	public final void execute(final Tuple input) {
 		this.lineBuilder = new StringBuilder();
 		this.lineBuilder.append(this.prefix);
-		lineBuilder.append(formatter.format(input));
-//		this.lineBuilder.append("(");
-//		for (final Object attribute : input.getValues()) {
-//			this.lineBuilder.append(attribute);
-//			this.lineBuilder.append(",");
-//		}
-//		this.lineBuilder.replace(this.lineBuilder.length() - 1, this.lineBuilder.length(), ")");
-
+		this.lineBuilder.append(this.formatter.format(input));
 		this.writeExternal(this.lineBuilder.toString());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
index 0de7b5c..bfc3135 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
@@ -18,8 +18,12 @@
 
 package org.apache.flink.stormcompatibility.util;
 
-public interface OutputFormatter {
+import java.io.Serializable;
 
-	public String format(Object input);
+import backtype.storm.tuple.Tuple;
+
+public interface OutputFormatter extends Serializable {
+
+	public String format(Tuple input);
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
index fca5fcc..a9d72d9 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
@@ -18,10 +18,14 @@
 
 package org.apache.flink.stormcompatibility.util;
 
+import backtype.storm.tuple.Tuple;
+
 public class SimpleOutputFormatter implements OutputFormatter {
+	private static final long serialVersionUID = 6349573860144270338L;
 
 	@Override
-	public String format(Object input) {
-		return input.toString();
+	public String format(final Tuple input) {
+		return input.getValue(0).toString();
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
index 86b55aa..a92bc6a 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
@@ -34,7 +34,7 @@ public final class StormBoltFileSink extends AbstractStormBoltSink {
 	private final String path;
 	private BufferedWriter writer;
 
-	public StormBoltFileSink(final String path, OutputFormatter formatter) {
+	public StormBoltFileSink(final String path, final OutputFormatter formatter) {
 		super(formatter);
 		this.path = path;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
index f22fab6..6419ee3 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
@@ -20,16 +20,14 @@ package org.apache.flink.stormcompatibility.util;
 
 import backtype.storm.tuple.Tuple;
 
-import java.io.Serializable;
-
-public class TupleOutputFormatter implements OutputFormatter, Serializable {
+public class TupleOutputFormatter implements OutputFormatter {
+	private static final long serialVersionUID = -599665757723851761L;
 
 	@Override
-	public String format(Object input) {
-		Tuple inputTuple = (Tuple) input;
-		StringBuilder stringBuilder = new StringBuilder();
+	public String format(final Tuple input) {
+		final StringBuilder stringBuilder = new StringBuilder();
 		stringBuilder.append("(");
-		for (final Object attribute : inputTuple.getValues()) {
+		for (final Object attribute : input.getValues()) {
 			stringBuilder.append(attribute);
 			stringBuilder.append(",");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
index a6b64d2..7b4f471 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
@@ -65,11 +65,11 @@ public class StormWordCountLocal {
 		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
 		cluster.submitTopology(topologyId, null, builder.createTopology());
 
-		Utils.sleep(100 * 1000);
+		Utils.sleep(5 * 1000);
 
 		// TODO kill does no do anything so far
-		//cluster.killTopology(topologyId);
-		//cluster.shutdown();
+		cluster.killTopology(topologyId);
+		cluster.shutdown();
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
index e384f34..75dd5fc 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
@@ -36,7 +36,7 @@ public class StormBoltExclamationITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void postSubmit() throws Exception {
-		this.compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+		compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
index e3813c4..d6bcf30 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
@@ -39,7 +39,7 @@ public class StormExclamationLocalITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void postSubmit() throws Exception {
-		this.compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+		compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
index 06c5d9a..2b08b4b 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
@@ -36,7 +36,7 @@ public class StormSpoutExclamationITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void postSubmit() throws Exception {
-		this.compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+		compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
index 2a5a0b3..9228474 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
@@ -34,7 +34,7 @@ public class BoltTokenizerWordCountITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void postSubmit() throws Exception {
-		this.compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
index d08b560..9d7b869 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
@@ -34,7 +34,7 @@ public class SpoutSourceWordCountITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void postSubmit() throws Exception {
-		this.compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
index eeb85ef..2427818 100644
--- a/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
+++ b/flink-staging/flink-streaming/flink-storm-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
@@ -37,7 +37,7 @@ public class StormWordCountLocalITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void postSubmit() throws Exception {
-		this.compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 4553c0b..5e7be8d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -956,20 +956,58 @@ public abstract class StreamExecutionEnvironment {
 	 * 		type of the returned stream
 	 * @return the data stream constructed
 	 */
-	@SuppressWarnings("unchecked")
 	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
+		return addSource(function, sourceName, null);
+	}
 
-		TypeInformation<OUT> typeInfo;
+	/**
+	 * Ads a data source with a custom type information thus opening a
+	 * {@link DataStream}. Only in very special cases does the user need to
+	 * support type information. Otherwise use
+	 * {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
+	 *
+	 * @param function
+	 * 		the user defined function
+	 * @param <OUT>
+	 * 		type of the returned stream
+	 * @param typeInfo
+	 * 		the user defined type information for the stream
+	 * @return the data stream constructed
+	 */
+	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, TypeInformation<OUT> typeInfo) {
+		return addSource(function, "Custom Source", typeInfo);
+	}
 
-		if (function instanceof ResultTypeQueryable) {
-			typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
-		} else {
-			try {
-				typeInfo = TypeExtractor.createTypeInfo(
-						SourceFunction.class,
-						function.getClass(), 0, null, null);
-			} catch (InvalidTypesException e) {
-				typeInfo = (TypeInformation<OUT>) new MissingTypeInfo("Custom source", e);
+	/**
+	 * Ads a data source with a custom type information thus opening a
+	 * {@link DataStream}. Only in very special cases does the user need to
+	 * support type information. Otherwise use
+	 * {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
+	 *
+	 * @param function
+	 * 		the user defined function
+	 * @param sourceName
+	 * 		Name of the data source
+	 * @param <OUT>
+	 * 		type of the returned stream
+	 * @param typeInfo
+	 * 		the user defined type information for the stream
+	 * @return the data stream constructed
+	 */
+	@SuppressWarnings("unchecked")
+	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
+
+		if(typeInfo == null) {
+			if (function instanceof ResultTypeQueryable) {
+				typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
+			} else {
+				try {
+					typeInfo = TypeExtractor.createTypeInfo(
+							SourceFunction.class,
+							function.getClass(), 0, null, null);
+				} catch (final InvalidTypesException e) {
+					typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
+				}
 			}
 		}