You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/15 11:33:05 UTC

[15/27] flink git commit: [storm-compat] adapted layer to new streaming API

[storm-compat] adapted layer to new streaming API

  - re-introduced possibility to specify output type for addSource()
  - split StormCollector into Abstract-, Bolt-, and Spout-Collector
additional minor changes:
  - removed unnecessary "unused" tags
  - fixed comment typos
  - added missing license header in SingleJoin example files


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e39699f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e39699f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e39699f3

Branch: refs/heads/master
Commit: e39699f3eb41517a7035a5e626c6bb20c6b70501
Parents: 09e5be4
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Wed Jun 10 00:11:45 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sun Jun 14 23:01:10 2015 +0200

----------------------------------------------------------------------
 .../stormcompatibility/api/FlinkClient.java     |   3 +-
 .../api/FlinkLocalCluster.java                  |   5 -
 .../api/FlinkOutputFieldsDeclarer.java          |  20 +--
 .../stormcompatibility/api/FlinkSubmitter.java  |  11 +-
 .../stormcompatibility/api/FlinkTopology.java   |   3 +-
 .../api/FlinkTopologyBuilder.java               |   3 +-
 .../wrappers/AbstractStormCollector.java        | 111 ++++++++++++++
 .../wrappers/AbstractStormSpoutWrapper.java     |  27 ++--
 .../wrappers/FlinkDummyRichFunction.java        |  52 -------
 .../wrappers/StormBoltCollector.java            |  92 ++++++++++++
 .../wrappers/StormBoltWrapper.java              |  73 ++++------
 .../wrappers/StormCollector.java                | 145 -------------------
 .../wrappers/StormFiniteSpoutWrapper.java       |  10 +-
 .../wrappers/StormOutputFieldsDeclarer.java     |   4 +-
 .../wrappers/StormSpoutCollector.java           |  80 ++++++++++
 .../stormcompatibility/wrappers/StormTuple.java |  12 +-
 .../wrappers/StormWrapperSetupHelper.java       |  28 ++--
 .../wrappers/FiniteTestSpout.java               |   4 +-
 .../wrappers/FlinkDummyRichFunctionTest.java    |  38 -----
 .../wrappers/StormBoltCollectorTest.java        | 101 +++++++++++++
 .../wrappers/StormBoltWrapperTest.java          |  43 ++----
 .../wrappers/StormCollectorTest.java            | 120 ---------------
 .../wrappers/StormFiniteSpoutWrapperTest.java   |  13 +-
 .../wrappers/StormOutputFieldsDeclarerTest.java |   6 +-
 .../wrappers/StormSpoutCollectorTest.java       |  88 +++++++++++
 .../wrappers/StormSpoutWrapperTest.java         |   2 +-
 .../wrappers/TestCollector.java                 |  39 -----
 .../wrappers/TestContext.java                   |  40 +++++
 .../singlejoin/SingleJoinTopology.java          |  25 +++-
 .../singlejoin/StormSingleJoinLocal.java        |  18 ++-
 .../singlejoin/stormoperators/AgeSpout.java     |  17 +++
 .../singlejoin/stormoperators/GenderSpout.java  |  17 +++
 .../stormoperators/SingleJoinBolt.java          |  18 ++-
 .../util/AbstractStormBoltSink.java             |  13 +-
 .../util/OutputFormatter.java                   |   8 +-
 .../util/SimpleOutputFormatter.java             |   8 +-
 .../util/StormBoltFileSink.java                 |   2 +-
 .../util/TupleOutputFormatter.java              |  12 +-
 .../wordcount/StormWordCountLocal.java          |   6 +-
 .../exclamation/StormBoltExclamationITCase.java |   2 +-
 .../StormExclamationLocalITCase.java            |   2 +-
 .../StormSpoutExclamationITCase.java            |   2 +-
 .../wordcount/BoltTokenizerWordCountITCase.java |   2 +-
 .../wordcount/SpoutSourceWordCountITCase.java   |   2 +-
 .../wordcount/StormWordCountLocalITCase.java    |   2 +-
 .../environment/StreamExecutionEnvironment.java |  60 ++++++--
 46 files changed, 805 insertions(+), 584 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
index 0f11d63..242c154 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
@@ -122,7 +122,7 @@ public class FlinkClient {
 	@SuppressWarnings("rawtypes")
 	public static FlinkClient getConfiguredClient(final Map conf) {
 		final String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
-		final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
+		final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
 		return new FlinkClient(nimbusHost, nimbusPort);
 	}
 
@@ -133,7 +133,6 @@ public class FlinkClient {
 	 *
 	 * @return A reference to itself.
 	 */
-	@SuppressWarnings("unused")
 	public FlinkClient getClient() {
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
index 7160bc4..e82e97a 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
@@ -65,27 +65,22 @@ public class FlinkLocalCluster {
 		ClusterUtil.stopOnMiniCluster();
 	}
 
-	@SuppressWarnings("unused")
 	public String getTopologyConf(final String id) {
 		return null;
 	}
 
-	@SuppressWarnings("unused")
 	public StormTopology getTopology(final String id) {
 		return null;
 	}
 
-	@SuppressWarnings("unused")
 	public ClusterSummary getClusterInfo() {
 		return null;
 	}
 
-	@SuppressWarnings("unused")
 	public TopologyInfo getTopologyInfo(final String id) {
 		return null;
 	}
 
-	@SuppressWarnings("unused")
 	public Map<?, ?> getState() {
 		return null;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
index 206db28..d6a0230 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.stormcompatibility.api;
 
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
@@ -29,11 +27,16 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import java.util.List;
 
 /**
- * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a {@link IRichSpout spout} or {@link
- * IRichBolt bolt}.<br /> <br /> <strong>CAUTION: Currently, Flink does only support the default output stream.
- * Furthermore, direct emit is not supported.</strong>
+ * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a
+ * {@link backtype.storm.topology.IRichSpout spout} or {@link backtype.storm.topology.IRichBolt
+ * bolt}.<br />
+ * <br />
+ * <strong>CAUTION: Currently, Flink does only support the default output stream. Furthermore,
+ * direct emit is not supported.</strong>
  */
 final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
+
+	// the declared output schema
 	private Fields outputSchema;
 
 	@Override
@@ -57,7 +60,7 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
 	/**
 	 * {@inheritDoc}
 	 * <p/>
-	 * Currently, Flink only supports the default output stream. Thus, pareamter {@code streamId} must be equals to
+	 * Currently, Flink only supports the default output stream. Thus, parameter {@code streamId} must be equals to
 	 * {@link Utils#DEFAULT_STREAM_ID}.
 	 *
 	 * @throws UnsupportedOperationException
@@ -71,7 +74,7 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
 	/**
 	 * {@inheritDoc}
 	 * <p/>
-	 * Currently, Flink only supports the default output stream. Thus, pareamter {@code streamId} must be equals to
+	 * Currently, Flink only supports the default output stream. Thus, parameter {@code streamId} must be equals to
 	 * {@link Utils#DEFAULT_STREAM_ID}. Furthermore, direct emit is no supported by Flink and parameter {@code direct}
 	 * must be {@code false}.
 	 *
@@ -132,15 +135,12 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
 	 * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link
 	 * TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable,
 	 * Flink cannot use them and will throw an exception.
-	 *
-	 * @author mjsax
 	 */
 	private static class DefaultComparable implements Comparable<DefaultComparable> {
 
 		public DefaultComparable() {
 		}
 
-		@SuppressWarnings("NullableProblems")
 		@Override
 		public int compareTo(final DefaultComparable o) {
 			return 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
index 1f37bf8..c86ee8a 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
@@ -23,6 +23,7 @@ import backtype.storm.generated.AlreadyAliveException;
 import backtype.storm.generated.InvalidTopologyException;
 import backtype.storm.generated.SubmitOptions;
 import backtype.storm.utils.Utils;
+
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
@@ -39,7 +40,7 @@ import java.util.Map;
  * {@link FlinkSubmitter} mimics a {@link StormSubmitter} to submit Storm topologies to a Flink cluster.
  */
 public class FlinkSubmitter {
-	public static Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class);
+	public final static Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class);
 
 	/**
 	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
@@ -57,7 +58,6 @@ public class FlinkSubmitter {
 	 * @throws InvalidTopologyException
 	 * 		if an invalid topology was submitted
 	 */
-	@SuppressWarnings("unused")
 	public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology,
 			final SubmitOptions opts)
 			throws AlreadyAliveException, InvalidTopologyException {
@@ -97,7 +97,8 @@ public class FlinkSubmitter {
 		}
 		if (!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
 			stormConf.put(Config.NIMBUS_THRIFT_PORT,
-					flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6123));
+					new Integer(flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+							6123)));
 		}
 
 		final String serConf = JSONValue.toJSONString(stormConf);
@@ -151,7 +152,6 @@ public class FlinkSubmitter {
 	 * @throws InvalidTopologyException
 	 * 		if an invalid topology was submitted
 	 */
-	@SuppressWarnings("unused")
 	public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf,
 			final FlinkTopology topology)
 			throws AlreadyAliveException, InvalidTopologyException {
@@ -169,7 +169,7 @@ public class FlinkSubmitter {
 	 * 		file path of the jar file to submit
 	 * @return the value of parameter localJar
 	 */
-	@SuppressWarnings({"rawtypes", "unused"})
+	@SuppressWarnings("rawtypes")
 	public static String submitJar(final Map conf, final String localJar) {
 		return submitJar(localJar);
 	}
@@ -187,7 +187,6 @@ public class FlinkSubmitter {
 	 * 		progress listener to track the jar file upload
 	 * @return the value of parameter localJar
 	 */
-	@SuppressWarnings("rawtypes")
 	public static String submitJar(final String localJar) {
 		if (localJar == null) {
 			throw new RuntimeException(

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
index ae0730e..4b7f0dc 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  */
 class FlinkTopology extends StreamExecutionEnvironment {
 
-	// The corresponding {@link StormTopology} that is mimiced by this {@link FlinkTopology}
+	// The corresponding {@link StormTopology} that is mimicked by this {@link FlinkTopology}
 	private final StormTopology stormTopology;
 	// The number of declared tasks for the whole program (ie, sum over all dops)
 	private int numberOfTasks = 0;
@@ -70,7 +70,6 @@ class FlinkTopology extends StreamExecutionEnvironment {
 	}
 
 	//TODO
-	@SuppressWarnings("unused")
 	public String getStormTopologyAsString() {
 		return this.stormTopology.toString();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index 0f09351..41abbb1 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -142,7 +142,7 @@ public class FlinkTopologyBuilder {
 						final Grouping grouping = inputStream.getValue();
 						if (grouping.is_set_shuffle()) {
 							// Storm uses a round-robin shuffle strategy
-							inputDataStream = inputDataStream.distribute();
+							inputDataStream = inputDataStream.rebalance();
 						} else if (grouping.is_set_fields()) {
 							// global grouping is emulated in Storm via an empty fields grouping list
 							final List<String> fields = grouping.get_fields();
@@ -234,7 +234,6 @@ public class FlinkTopologyBuilder {
 	 * 		the basic bolt
 	 * @return use the returned object to declare the inputs to this component
 	 */
-	@SuppressWarnings("unused")
 	public BoltDeclarer setBolt(final String id, final IBasicBolt bolt) {
 		return this.setBolt(id, bolt, null);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
new file mode 100644
index 0000000..e8048b0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wrappers;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+import java.util.List;
+
+/**
+ * A {@link AbstractStormCollector} transforms Storm tuples to Flink tuples.
+ */
+abstract class AbstractStormCollector<OUT> {
+
+	/**
+	 * Flink output tuple of concrete type {@link Tuple1} to {@link Tuple25}.
+	 */
+	protected final Tuple outputTuple;
+	/**
+	 * The number of attributes of the output tuples. (Determines the concrete type of
+	 * {@link #outputTuple}). If {@link #numberOfAttributes} is zero, {@link #outputTuple} is not
+	 * used and "raw" data type is used.
+	 */
+	protected final int numberOfAttributes;
+	/**
+	 * Is set to {@code true} each time a tuple is emitted.
+	 */
+	boolean tupleEmitted = false;
+
+	/**
+	 * Instantiates a new {@link AbstractStormCollector} that emits Flink tuples via
+	 * {@link #doEmit(Object)}. If the number of attributes is specified as zero, any output type is
+	 * supported. If the number of attributes is between 1 to 25, the output type is {@link Tuple1}
+	 * to {@link Tuple25}.
+	 * 
+	 * @param numberOfAttributes
+	 * 		The number of attributes of the emitted tuples.
+	 * @throws UnsupportedOperationException
+	 * 		if the specified number of attributes is not in the valid range of [0,25]
+	 */
+	public AbstractStormCollector(final int numberOfAttributes) throws UnsupportedOperationException {
+		this.numberOfAttributes = numberOfAttributes;
+
+		if (this.numberOfAttributes <= 0) {
+			this.outputTuple = null;
+		} else if (this.numberOfAttributes <= 25) {
+			try {
+				this.outputTuple = org.apache.flink.api.java.tuple.Tuple
+						.getTupleClass(this.numberOfAttributes).newInstance();
+			} catch (final InstantiationException e) {
+				throw new RuntimeException(e);
+			} catch (final IllegalAccessException e) {
+				throw new RuntimeException(e);
+			}
+		} else {
+			throw new UnsupportedOperationException(
+					"SimpleStormBoltWrapper can handle not more then 25 attributes, but "
+					+ this.numberOfAttributes + " are declared by the given bolt");
+		}
+	}
+
+	/**
+	 * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via
+	 * {@link #doEmit(Object)}.
+	 * 
+	 * @param tuple
+	 * 		The Storm tuple to be emitted.
+	 * @return the return value of {@link #doEmit(Object)}
+	 */
+	@SuppressWarnings("unchecked")
+	protected final List<Integer> tansformAndEmit(final List<Object> tuple) {
+		List<Integer> taskIds;
+		if (this.numberOfAttributes > 0) {
+			assert (tuple.size() == this.numberOfAttributes);
+			for (int i = 0; i < this.numberOfAttributes; ++i) {
+				this.outputTuple.setField(tuple.get(i), i);
+			}
+			taskIds = doEmit((OUT) this.outputTuple);
+		} else {
+			assert (tuple.size() == 1);
+			taskIds = doEmit((OUT) tuple.get(0));
+		}
+		this.tupleEmitted = true;
+
+		return taskIds;
+	}
+
+	/**
+	 * Emits a Flink tuple.
+	 * 
+	 * @param flinkTuple
+	 * 		The tuple to be emitted.
+	 * @return the IDs of the tasks this tuple was sent to
+	 */
+	protected abstract List<Integer> doEmit(OUT flinkTuple);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
index 5bc4635..f685f13 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
@@ -19,16 +19,16 @@ package org.apache.flink.stormcompatibility.wrappers;
 
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.topology.IRichSpout;
+
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
 
 /**
  * A {@link AbstractStormSpoutWrapper} wraps an {@link IRichSpout} in order to execute the Storm bolt within a Flink
  * Streaming program. It takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see
- * {@link StormCollector} for supported types).<br />
+ * {@link StormSpoutCollector} for supported types).<br />
  * <br />
  * <strong>CAUTION: currently, only simple spouts are supported! (ie, spouts that do not use the Storm configuration
  * <code>Map</code> or <code>TopologyContext</code> that is provided by the spouts's <code>prepare(..)</code> method.
@@ -37,13 +37,19 @@ import org.apache.flink.util.Collector;
 public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
 	private static final long serialVersionUID = 4993283609095408765L;
 
-	// The wrapped Storm {@link IRichSpout spout}
-	protected final IRichSpout spout;
-	// Number of attributes of the bolt's output tuples
+	// Number of attributes of the bolt's output tuples.
 	private final int numberOfAttributes;
-	// The wrapper of the given Flink collector
-	protected StormCollector<OUT> collector;
-	// Indicates, if the source is still running or was canceled
+	/**
+	 * The wrapped Storm {@link IRichSpout spout}.
+	 */
+	protected final IRichSpout spout;
+	/**
+	 * The wrapper of the given Flink collector.
+	 */
+	protected StormSpoutCollector<OUT> collector;
+	/**
+	 * Indicates, if the source is still running or was canceled.
+	 */
 	protected boolean isRunning = true;
 
 	/**
@@ -56,7 +62,6 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF
 	 * @throws IllegalArgumentException
 	 * 		If the number of declared output attributes is not with range [1;25].
 	 */
-	@SuppressWarnings("unused")
 	public AbstractStormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
 		this(spout, false);
 	}
@@ -84,8 +89,8 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF
 	}
 
 	@Override
-	public final void run(final Collector<OUT> collector) throws Exception {
-		this.collector = new StormCollector<OUT>(this.numberOfAttributes, collector);
+	public final void run(final SourceContext<OUT> ctx) throws Exception {
+		this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
 		this.spout.open(null,
 				StormWrapperSetupHelper
 						.convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunction.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunction.java
deleted file mode 100644
index 3dbc451..0000000
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunction.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-
-import java.io.Serializable;
-
-/**
- * {@link FlinkDummyRichFunction} has the only purpose to retrieve the {@link RuntimeContext} for
- * {@link StormBoltWrapper} provided by Flink.
- */
-class FlinkDummyRichFunction implements RichFunction, Serializable {
-	private static final long serialVersionUID = 7992273349877302520L;
-
-	// The runtime context of a Storm bolt
-	private RuntimeContext context;
-
-	@Override
-	public void open(final Configuration parameters) throws Exception {/* nothing to do */}
-
-	@Override
-	public void close() throws Exception {/* nothing to do */}
-
-	@Override
-	public RuntimeContext getRuntimeContext() {
-		return this.context;
-	}
-
-	@Override
-	public void setRuntimeContext(final RuntimeContext t) {
-		this.context = t;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
new file mode 100644
index 0000000..59b5cf0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.tuple.Tuple;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.streaming.api.operators.Output;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A {@link StormBoltCollector} is used by {@link StormBoltWrapper} to provided an Storm compatible
+ * output collector to the wrapped bolt. It transforms the emitted Storm tuples into Flink tuples
+ * and emits them via the provide {@link Output} object.
+ */
+class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector {
+
+	// The Flink output object
+	private final Output<OUT> flinkOutput;
+
+	/**
+	 * Instantiates a new {@link StormBoltCollector} that emits Flink tuples to the given Flink
+	 * output object. If the number of attributes is specified as zero, any output type is
+	 * supported. If the number of attributes is between 1 to 25, the output type is {@link Tuple1}
+	 * to {@link Tuple25}.
+	 * 
+	 * @param numberOfAttributes
+	 *        The number of attributes of the emitted tuples.
+	 * @param flinkOutput
+	 *        The Flink output object to be used.
+	 * @throws UnsupportedOperationException
+	 *         if the specified number of attributes is not in the valid range of [0,25]
+	 */
+	public StormBoltCollector(final int numberOfAttributes, final Output<OUT> flinkOutput) throws UnsupportedOperationException {
+		super(numberOfAttributes);
+		assert (flinkOutput != null);
+		this.flinkOutput = flinkOutput;
+	}
+
+	@Override
+	protected List<Integer> doEmit(final OUT flinkTuple) {
+		this.flinkOutput.collect(flinkTuple);
+		// TODO
+		return null;
+	}
+
+	@Override
+	public void reportError(final Throwable error) {
+		// not sure, if Flink can support this
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public List<Integer> emit(final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
+		return this.tansformAndEmit(tuple);
+	}
+
+	@Override
+	public void emitDirect(final int taskId, final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
+		throw new UnsupportedOperationException("Direct emit is not supported by Flink");
+	}
+
+	@Override
+	public void ack(final Tuple input) {
+		throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink");
+	}
+
+	@Override
+	public void fail(final Tuple input) {
+		throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
index 6a5efdf..f7ed8c5 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
@@ -20,24 +20,30 @@ package org.apache.flink.stormcompatibility.wrappers;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
+
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
+
+
+
+
 /**
  * A {@link StormBoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming
- * program. It takes the Flink input tuples of type {@code IN} and transforms them into a {@link StormTuple}s that the
+ * program. It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the
  * bolt can process. Furthermore, it takes the bolt's output tuples and transforms them into Flink tuples of type
- * {@code OUT} (see {@link StormCollector} for supported types).<br />
+ * {@code OUT} (see {@link AbstractStormCollector} for supported types).<br />
  * <br />
  * <strong>CAUTION: currently, only simple bolts are supported! (ie, bolts that do not use the Storm configuration
  * <code>Map</code> or <code>TopologyContext</code> that is provided by the bolt's <code>open(..)</code> method.
  * Furthermore, acking and failing of tuples as well as accessing tuple attributes by field names is not supported so
  * far.</strong>
  */
-public class StormBoltWrapper<IN, OUT> extends StreamOperator<IN, OUT> {
+public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
 	private static final long serialVersionUID = -4788589118464155835L;
 
 	// The wrapped Storm {@link IRichBolt bolt}
@@ -46,12 +52,10 @@ public class StormBoltWrapper<IN, OUT> extends StreamOperator<IN, OUT> {
 	private final int numberOfAttributes;
 
 	/**
-	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it
-	 * can be
-	 * used within a Flink streaming program. The output type will be one of {@link Tuple1} to {@link Tuple25}
-	 * depending
-	 * on the bolt's declared number of attributes.
-	 *
+	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt}
+	 * such that it can be used within a Flink streaming program. The output type will be one of
+	 * {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
+	 * 
 	 * @param bolt
 	 * 		The Storm {@link IRichBolt bolt} to be used.
 	 * @throws IllegalArgumentException
@@ -62,25 +66,23 @@ public class StormBoltWrapper<IN, OUT> extends StreamOperator<IN, OUT> {
 	}
 
 	/**
-	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it
-	 * can be
-	 * used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
-	 * {@code true} and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
-	 * output type will be one of {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of
-	 * attributes.
-	 *
+	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt}
+	 * such that it can be used within a Flink streaming program. The output type can be any type if
+	 * parameter {@code rawOutput} is {@code true} and the bolt's number of declared output tuples
+	 * is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to
+	 * {@link Tuple25} depending on the bolt's declared number of attributes.
+	 * 
 	 * @param bolt
 	 * 		The Storm {@link IRichBolt bolt} to be used.
 	 * @param rawOutput
-	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 * 		of a raw type.
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type
+	 * 		{@link Tuple1} but be of a raw type.
 	 * @throws IllegalArgumentException
-	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 * 		[1;25].
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is
+	 * 		not 1 or if {@code rawOuput} is {@code false} and the number of declared output
+	 * 		attributes is not with range [1;25].
 	 */
 	public StormBoltWrapper(final IRichBolt bolt, final boolean rawOutput) throws IllegalArgumentException {
-		super(new FlinkDummyRichFunction());
 		this.bolt = bolt;
 		this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutput);
 	}
@@ -89,38 +91,27 @@ public class StormBoltWrapper<IN, OUT> extends StreamOperator<IN, OUT> {
 	public void open(final Configuration parameters) throws Exception {
 		super.open(parameters);
 
-		final StreamingRuntimeContext flinkContext = (StreamingRuntimeContext) ((FlinkDummyRichFunction) super
-				.getUserFunction()).getRuntimeContext();
-
-		final TopologyContext topologyContext = StormWrapperSetupHelper.convertToTopologyContext(flinkContext, false);
+		final TopologyContext topologyContext = StormWrapperSetupHelper.convertToTopologyContext(
+				(StreamingRuntimeContext)super.runtimeContext, false);
 		OutputCollector stormCollector = null;
 
 		if (this.numberOfAttributes != -1) {
-			stormCollector = new OutputCollector(new StormCollector<OUT>(this.numberOfAttributes, super.collector));
+			stormCollector = new OutputCollector(new StormBoltCollector<OUT>(
+					this.numberOfAttributes, super.output));
 		}
 
 		this.bolt.prepare(null, topologyContext, stormCollector);
 	}
 
 	@Override
-	public void close() {
+	public void close() throws Exception {
 		super.close();
 		this.bolt.cleanup();
 	}
 
-	/**
-	 * Transforms a Flink tuple into a Storm tuple and calls the bolt's {@code execute} method.
-	 */
-	@Override
-	protected void callUserFunction() throws Exception {
-		this.bolt.execute(new StormTuple<IN>(this.nextRecord.getObject()));
-	}
-
 	@Override
-	public void run() throws Exception {
-		while (this.readNext() != null) {
-			this.callUserFunctionAndLogException();
-		}
+	public void processElement(final IN element) throws Exception {
+		this.bolt.execute(new StormTuple<IN>(element));
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java
deleted file mode 100644
index 1c13e88..0000000
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-/* we do not import
- * --> "org.apache.flink.api.java.tuple.Tuple"
- * or
- * --> "backtype.storm.tuple.Tuple"
- * to avoid confusion
- */
-
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.task.IOutputCollector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.util.Collector;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * A {@link StormCollector} is used by {@link AbstractStormSpoutWrapper} and {@link StormBoltWrapper} to provided an
- * Storm compatible output collector to the wrapped spout or bolt, respectively. Additionally, {@link StormCollector}
- * transforms the bolt's output tuples into Flink tuples and emits the created Flink tuples using a Flink
- * {@link Collector}.
- */
-class StormCollector<OUT> implements ISpoutOutputCollector, IOutputCollector {
-
-	// The Flink collector
-	private final Collector<OUT> flinkCollector;
-	// The Flink output tuple of concrete type {@link Tuple1} to {@link Tuple25}
-	private final org.apache.flink.api.java.tuple.Tuple outputTuple;
-	// The number of attributes of the output tuples. (Determines the concrete type of {@link #outputTuple})
-	private final int numberOfAttributes;
-	// Is set to {@code true} each time a tuple is emitted
-	boolean tupleEmitted = false;
-
-	/**
-	 * Instantiates a new {@link StormCollector} that emits Flink tuples to the given Flink collector. If the number of
-	 * attributes is specified as zero, any output type is supported. If the number of attributes is between 1 to 25,
-	 * the output type is {@link Tuple1} to {@link Tuple25}.
-	 *
-	 * @param numberOfAttributes
-	 * 		The number of attributes of the emitted tuples.
-	 * @param flinkCollector
-	 * 		The Flink collector to be used.
-	 * @throws UnsupportedOperationException
-	 * 		if the specified number of attributes is not in the valid range of [0,25]
-	 */
-	public StormCollector(final int numberOfAttributes, final Collector<OUT> flinkCollector)
-			throws UnsupportedOperationException {
-		this.numberOfAttributes = numberOfAttributes;
-		this.flinkCollector = flinkCollector;
-
-		if (this.numberOfAttributes <= 0) {
-			this.outputTuple = null;
-		} else if (this.numberOfAttributes <= 25) {
-			try {
-				this.outputTuple = Tuple.getTupleClass(this.numberOfAttributes).newInstance();
-			} catch (final InstantiationException e) {
-				throw new RuntimeException(e);
-			} catch (final IllegalAccessException e) {
-				throw new RuntimeException(e);
-			}
-		} else {
-			throw new UnsupportedOperationException(
-					"SimpleStormBoltWrapper can handle not more then 25 attributes, but " + this.numberOfAttributes
-							+ " are declared by the given bolt");
-		}
-	}
-
-	@Override
-	public void reportError(final Throwable error) {
-		// not sure, if Flink can support this
-		throw new UnsupportedOperationException("Not implemented yet");
-	}
-
-	@Override
-	public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) {
-		return this.emitImpl(tuple);
-	}
-
-	@Override
-	public List<Integer> emit(final String streamId, final Collection<backtype.storm.tuple.Tuple> anchors,
-			final List<Object> tuple) {
-		return this.emitImpl(tuple);
-	}
-
-	@SuppressWarnings("unchecked")
-	public List<Integer> emitImpl(
-			final List<Object> tuple) {
-		if (this.numberOfAttributes > 0) {
-			assert (tuple.size() == this.numberOfAttributes);
-			for (int i = 0; i < this.numberOfAttributes; ++i) {
-				this.outputTuple.setField(tuple.get(i), i);
-			}
-			this.flinkCollector.collect((OUT) this.outputTuple);
-		} else {
-			assert (tuple.size() == 1);
-			this.flinkCollector.collect((OUT) tuple.get(0));
-		}
-		this.tupleEmitted = true;
-
-		// TODO
-		return null;
-	}
-
-	@Override
-	public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) {
-		throw new UnsupportedOperationException("Direct emit is not supported by Flink");
-	}
-
-	@Override
-	public void emitDirect(final int taskId, final String streamId,
-			final Collection<backtype.storm.tuple.Tuple> anchors, final List<Object> tuple) {
-		throw new UnsupportedOperationException("Direct emit is not supported by Flink");
-	}
-
-	@Override
-	public void ack(final backtype.storm.tuple.Tuple input) {
-		throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink");
-	}
-
-	@Override
-	public void fail(final backtype.storm.tuple.Tuple input) {
-		throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
index ebbf80f..6cf8c1b 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
@@ -22,10 +22,12 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 
 /**
- * A {@link StormFiniteSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calles {@link IRichSpout#nextTuple()
- * nextTuple()} for finite number of times before {@link #run(org.apache.flink.util.Collector)} returns. The number of
- * {@code nextTuple()} calls can be specified as a certain number of invocations or can be undefined. In the undefined
- * case, the {@code run(...)} method return if no record was emitted to the output collector for the first time.
+ * A {@link StormFiniteSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls
+ * {@link IRichSpout#nextTuple() nextTuple()} for finite number of times before
+ * {@link #run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext)}
+ * returns. The number of {@code nextTuple()} calls can be specified as a certain number of
+ * invocations or can be undefined. In the undefined case, the {@code run(...)} method return if no
+ * record was emitted to the output collector for the first time.
  */
 public class StormFiniteSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
 	private static final long serialVersionUID = 3883246587044801286L;

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
index 6005d6d..c486237 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
@@ -22,8 +22,8 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
 
 /**
- * {@link StormOutputFieldsDeclarer} is used by {@link StormBoltWrapper} to determine the number of attributes declared
- * by the wrapped bolt's {@code declare(...)} method.
+ * {@link StormOutputFieldsDeclarer} is used by {@link StormWrapperSetupHelper} to determine the
+ * number of attributes declared by the wrapped spout's or bolt's {@code declare(...)} method.
  */
 class StormOutputFieldsDeclarer implements OutputFieldsDeclarer {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
new file mode 100644
index 0000000..01c980a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.spout.ISpoutOutputCollector;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import java.util.List;
+
+/**
+ * A {@link StormSpoutCollector} is used by {@link AbstractStormSpoutWrapper} to provided an Storm
+ * compatible output collector to the wrapped spout. It transforms the emitted Storm tuples into
+ * Flink tuples and emits them via the provide {@link SourceContext} object.
+ */
+class StormSpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector {
+
+	// The Flink source context object
+	private final SourceContext<OUT> flinkContext;
+
+	/**
+	 * Instantiates a new {@link StormSpoutCollector} that emits Flink tuples to the given Flink
+	 * source context. If the number of attributes is specified as zero, any output type is
+	 * supported. If the number of attributes is between 1 to 25, the output type is {@link Tuple1}
+	 * to {@link Tuple25}.
+	 * 
+	 * @param numberOfAttributes
+	 *        The number of attributes of the emitted tuples.
+	 * @param flinkContext
+	 *        The Flink source context to be used.
+	 * @throws UnsupportedOperationException
+	 *         if the specified number of attributes is not in the valid range of [0,25]
+	 */
+	public StormSpoutCollector(final int numberOfAttributes, final SourceContext<OUT> flinkContext) throws UnsupportedOperationException {
+		super(numberOfAttributes);
+		assert (flinkContext != null);
+		this.flinkContext = flinkContext;
+	}
+
+	@Override
+	protected List<Integer> doEmit(final OUT flinkTuple) {
+		this.flinkContext.collect(flinkTuple);
+		// TODO
+		return null;
+	}
+
+	@Override
+	public void reportError(final Throwable error) {
+		// not sure, if Flink can support this
+		throw new UnsupportedOperationException("Not implemented yet");
+	}
+
+	@Override
+	public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) {
+		return this.tansformAndEmit(tuple);
+	}
+
+
+	@Override
+	public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) {
+		throw new UnsupportedOperationException("Direct emit is not supported by Flink");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
index 8019d7d..3fb1b06 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
@@ -15,13 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.stormcompatibility.wrappers;
 
+/*
+ * We do neither import
+ * 		backtype.storm.tuple.Tuple;
+ * nor
+ * 		org.apache.flink.api.java.tuple.Tuple
+ * to avoid confusion
+ */
+
 import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 
 import java.util.List;
@@ -29,7 +35,7 @@ import java.util.List;
 /**
  * {@link StormTuple} converts a Flink tuple of type {@code IN} into a Storm tuple.
  */
-class StormTuple<IN> implements Tuple {
+class StormTuple<IN> implements backtype.storm.tuple.Tuple {
 
 	// The storm representation of the original Flink tuple
 	private final Values stormTuple;

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
index a8f99e6..a189fba 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
@@ -25,8 +25,6 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IComponent;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.IRichSpout;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.stormcompatibility.api.FlinkTopologyContext;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
@@ -40,22 +38,22 @@ import java.util.Map;
 class StormWrapperSetupHelper {
 
 	/**
-	 * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or {@link
-	 * StormBoltWrapper}.
-	 * Returns zero for raw output type or a value within range [1;25] for output type {@link Tuple1} to {@link
-	 * Tuple25}
-	 * . In case of a data sink, {@code -1} is returned. .
-	 *
+	 * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or
+	 * {@link StormBoltWrapper}. Returns zero for raw output type or a value within range [1;25] for
+	 * output type {@link org.apache.flink.api.java.tuple.Tuple1 Tuple1} to
+	 * {@link org.apache.flink.api.java.tuple.Tuple25 Tuple25} . In case of a data sink, {@code -1}
+	 * is returned. .
+	 * 
 	 * @param spoutOrBolt
 	 * 		The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used.
 	 * @param rawOutput
-	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 * 		of a raw type.
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type
+	 * 		{@link org.apache.flink.api.java.tuple.Tuple1 Tuple1} but be of a raw type.
 	 * @return The number of attributes to be used.
 	 * @throws IllegalArgumentException
-	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 * 		[1;25].
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output
+	 * 		attributes is not 1 or if {@code rawOuput} is {@code false} and the number
+	 * 		of declared output attributes is not with range [1;25].
 	 */
 	public static int getNumberOfAttributes(final IComponent spoutOrBolt, final boolean rawOutput)
 			throws IllegalArgumentException {
@@ -79,7 +77,7 @@ class StormWrapperSetupHelper {
 			if (declaredNumberOfAttributes > 1) {
 				throw new IllegalArgumentException(
 						"Ouput type is requested to be raw type, but provided bolt declares more then one output " +
-								"attribute.");
+						"attribute.");
 
 			}
 			return 0;
@@ -91,7 +89,7 @@ class StormWrapperSetupHelper {
 	// TODO
 	public static TopologyContext convertToTopologyContext(final StreamingRuntimeContext context,
 			final boolean spoutOrBolt) {
-		final Integer taskId = 1 + context.getIndexOfThisSubtask();
+		final Integer taskId = new Integer(1 + context.getIndexOfThisSubtask());
 
 		final Map<Integer, String> taskToComponents = new HashMap<Integer, String>();
 		taskToComponents.put(taskId, context.getTaskName());

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
index 4182ad0..96b5aea 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
@@ -39,7 +39,7 @@ class FiniteTestSpout implements IRichSpout {
 	@SuppressWarnings("rawtypes")
 	@Override
 	public void open(final Map conf, final TopologyContext context,
-			final SpoutOutputCollector collector) {
+			@SuppressWarnings("hiding") final SpoutOutputCollector collector) {
 		this.collector = collector;
 	}
 
@@ -55,7 +55,7 @@ class FiniteTestSpout implements IRichSpout {
 	@Override
 	public void nextTuple() {
 		if (--this.numberOfOutputTuples >= 0) {
-			this.collector.emit(new Values(this.numberOfOutputTuples));
+			this.collector.emit(new Values(new Integer(this.numberOfOutputTuples)));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java
deleted file mode 100644
index 2c2a221..0000000
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-
-public class FlinkDummyRichFunctionTest {
-
-	@Test
-	public void testRuntimeContext() {
-		final FlinkDummyRichFunction dummy = new FlinkDummyRichFunction();
-
-		final RuntimeContext context = mock(RuntimeContext.class);
-		dummy.setRuntimeContext(context);
-
-		Assert.assertSame(context, dummy.getRuntimeContext());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
new file mode 100644
index 0000000..2789ce5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.tuple.Values;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.streaming.api.operators.Output;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class StormBoltCollectorTest extends AbstractTest {
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testBoltStormCollector() throws InstantiationException, IllegalAccessException {
+		for (int numberOfAttributes = 0; numberOfAttributes < 26; ++numberOfAttributes) {
+			final Output flinkCollector = mock(Output.class);
+			Tuple flinkTuple = null;
+			final Values tuple = new Values();
+
+			StormBoltCollector<?> collector;
+
+			if (numberOfAttributes == 0) {
+				collector = new StormBoltCollector(numberOfAttributes, flinkCollector);
+				tuple.add(new Integer(this.r.nextInt()));
+
+			} else {
+				collector = new StormBoltCollector(numberOfAttributes, flinkCollector);
+				flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+
+				for (int i = 0; i < numberOfAttributes; ++i) {
+					tuple.add(new Integer(this.r.nextInt()));
+					flinkTuple.setField(tuple.get(i), i);
+				}
+			}
+
+			final String streamId = "streamId";
+			final Collection anchors = mock(Collection.class);
+			final List<Integer> taskIds;
+			taskIds = collector.emit(streamId, anchors, tuple);
+
+			Assert.assertNull(taskIds);
+
+			if (numberOfAttributes == 0) {
+				verify(flinkCollector).collect(tuple.get(0));
+			} else {
+				verify(flinkCollector).collect(flinkTuple);
+			}
+		}
+	}
+
+
+	@SuppressWarnings("unchecked")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testReportError() {
+		new StormBoltCollector<Object>(1, mock(Output.class)).reportError(null);
+	}
+
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	@Test(expected = UnsupportedOperationException.class)
+	public void testEmitDirect() {
+		new StormBoltCollector<Object>(1, mock(Output.class)).emitDirect(0, null,
+				(Collection) null, null);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testAck() {
+		new StormBoltCollector<Object>(1, mock(Output.class)).ack(null);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testFail() {
+		new StormBoltCollector<Object>(1, mock(Output.class)).fail(null);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
index f2cfe59..780c75e 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -21,14 +21,13 @@ import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.tuple.Fields;
+
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
@@ -41,7 +40,6 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -49,6 +47,7 @@ import static org.mockito.Mockito.when;
 @PrepareForTest({StreamRecordSerializer.class, StormWrapperSetupHelper.class})
 public class StormBoltWrapperTest {
 
+	@SuppressWarnings("unused")
 	@Test(expected = IllegalArgumentException.class)
 	public void testWrapperRawType() throws Exception {
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
@@ -58,6 +57,7 @@ public class StormBoltWrapperTest {
 		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), true);
 	}
 
+	@SuppressWarnings("unused")
 	@Test(expected = IllegalArgumentException.class)
 	public void testWrapperToManyAttributes1() throws Exception {
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
@@ -71,6 +71,7 @@ public class StormBoltWrapperTest {
 		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class));
 	}
 
+	@SuppressWarnings("unused")
 	@Test(expected = IllegalArgumentException.class)
 	public void testWrapperToManyAttributes2() throws Exception {
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
@@ -94,7 +95,6 @@ public class StormBoltWrapperTest {
 	@SuppressWarnings({"rawtypes", "unchecked"})
 	private void testWrapper(final int numberOfAttributes) throws Exception {
 		assert ((0 <= numberOfAttributes) && (numberOfAttributes <= 25));
-
 		Tuple flinkTuple = null;
 		String rawTuple = null;
 
@@ -119,15 +119,7 @@ public class StormBoltWrapperTest {
 			when(record.getObject()).thenReturn(flinkTuple);
 		}
 
-		final StreamRecordSerializer serializer = mock(StreamRecordSerializer.class);
-		when(serializer.createInstance()).thenReturn(record);
-
-		final IndexedReaderIterator reader = mock(IndexedReaderIterator.class);
-		when(reader.next(record)).thenReturn(record).thenReturn(null);
-
-		final StreamTaskContext taskContext = mock(StreamTaskContext.class);
-		when(taskContext.getInputSerializer(0)).thenReturn(serializer);
-		when(taskContext.getIndexedInput(0)).thenReturn(reader);
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 
 		final IRichBolt bolt = mock(IRichBolt.class);
 
@@ -136,23 +128,17 @@ public class StormBoltWrapperTest {
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		final StormBoltWrapper wrapper = new StormBoltWrapper(bolt);
-		wrapper.setup(taskContext);
+		wrapper.setup(mock(Output.class), taskContext);
 
-		wrapper.callUserFunction();
+		wrapper.processElement(record.getObject());
 		if (numberOfAttributes == 0) {
 			verify(bolt).execute(eq(new StormTuple<String>(rawTuple)));
 		} else {
 			verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple)));
 		}
-
-		wrapper.run();
-		if (numberOfAttributes == 0) {
-			verify(bolt, times(2)).execute(eq(new StormTuple<String>(rawTuple)));
-		} else {
-			verify(bolt, times(2)).execute(eq(new StormTuple<Tuple>(flinkTuple)));
-		}
 	}
 
+	@SuppressWarnings("unchecked")
 	@Test
 	public void testOpen() throws Exception {
 		final IRichBolt bolt = mock(IRichBolt.class);
@@ -162,18 +148,19 @@ public class StormBoltWrapperTest {
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
-		wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+		wrapper.setup(mock(Output.class), mock(StreamingRuntimeContext.class));
 
 		wrapper.open(mock(Configuration.class));
 
 		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
 	}
 
+	@SuppressWarnings("unchecked")
 	@Test
 	public void testOpenSink() throws Exception {
 		final IRichBolt bolt = mock(IRichBolt.class);
 		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
-		wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+		wrapper.setup(mock(Output.class), mock(StreamingRuntimeContext.class));
 
 		wrapper.open(mock(Configuration.class));
 
@@ -191,9 +178,9 @@ public class StormBoltWrapperTest {
 
 		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
 
-		final StreamTaskContext<Object> taskContext = mock(StreamTaskContext.class);
-		when(taskContext.getOutputCollector()).thenReturn(mock(Collector.class));
-		wrapper.setup(taskContext);
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		// when(taskContext.getOutputCollector()).thenReturn(mock(Collector.class));
+		wrapper.setup(mock(Output.class), taskContext);
 
 		wrapper.close();
 		verify(bolt).cleanup();

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java
deleted file mode 100644
index 925da04..0000000
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.tuple.Values;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-public class StormCollectorTest extends AbstractTest {
-
-	@Test
-	public void testSpoutStormCollector() throws InstantiationException, IllegalAccessException {
-		for (int i = 0; i < 26; ++i) {
-			this.testStromCollector(true, i);
-		}
-	}
-
-	@Test
-	public void testBoltStormCollector() throws InstantiationException, IllegalAccessException {
-		for (int i = 0; i < 26; ++i) {
-			this.testStromCollector(false, i);
-		}
-	}
-
-	@SuppressWarnings({"rawtypes", "unchecked"})
-	private void testStromCollector(final boolean spoutTest, final int numberOfAttributes)
-			throws InstantiationException, IllegalAccessException {
-		assert ((0 <= numberOfAttributes) && (numberOfAttributes <= 25));
-
-		final Collector flinkCollector = mock(Collector.class);
-		Tuple flinkTuple = null;
-		final Values tuple = new Values();
-
-		StormCollector<?> collector;
-
-		if (numberOfAttributes == 0) {
-			collector = new StormCollector(numberOfAttributes, flinkCollector);
-			tuple.add(this.r.nextInt());
-
-		} else {
-			collector = new StormCollector(numberOfAttributes, flinkCollector);
-			flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
-
-			for (int i = 0; i < numberOfAttributes; ++i) {
-				tuple.add(this.r.nextInt());
-				flinkTuple.setField(tuple.get(i), i);
-			}
-		}
-
-		final String streamId = "streamId";
-		final Collection anchors = mock(Collection.class);
-		final List<Integer> taskIds;
-		final Object messageId = this.r.nextInt();
-		if (spoutTest) {
-			taskIds = collector.emit(streamId, tuple, messageId);
-		} else {
-			taskIds = collector.emit(streamId, anchors, tuple);
-		}
-
-		Assert.assertNull(taskIds);
-
-		if (numberOfAttributes == 0) {
-			verify(flinkCollector).collect(tuple.get(0));
-		} else {
-			verify(flinkCollector).collect(flinkTuple);
-		}
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testReportError() {
-		new StormCollector<Object>(1, null).reportError(null);
-	}
-
-	@SuppressWarnings({"rawtypes", "unchecked"})
-	@Test(expected = UnsupportedOperationException.class)
-	public void testBoltEmitDirect() {
-		new StormCollector<Object>(1, null).emitDirect(0, null, (Collection) null, null);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test(expected = UnsupportedOperationException.class)
-	public void testSpoutEmitDirect() {
-		new StormCollector<Object>(1, null).emitDirect(0, null, null, (Object) null);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testAck() {
-		new StormCollector<Object>(1, null).ack(null);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testFail() {
-		new StormCollector<Object>(1, null).fail(null);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
index 0c5b124..c890ab1 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
@@ -19,8 +19,10 @@ package org.apache.flink.stormcompatibility.wrappers;
 
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.tuple.Fields;
+
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.junit.Assert;
 import org.junit.Test;
@@ -39,6 +41,7 @@ import static org.mockito.Mockito.verify;
 @PrepareForTest(StormWrapperSetupHelper.class)
 public class StormFiniteSpoutWrapperTest extends AbstractTest {
 
+	@SuppressWarnings("unchecked")
 	@Test
 	public void testRunExecuteFixedNumber() throws Exception {
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
@@ -50,7 +53,7 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
 		final StormFiniteSpoutWrapper<?> spoutWrapper = new StormFiniteSpoutWrapper<Object>(spout, numberOfCalls);
 		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
 
-		spoutWrapper.run(null);
+		spoutWrapper.run(mock(SourceContext.class));
 		verify(spout, times(numberOfCalls)).nextTuple();
 	}
 
@@ -60,7 +63,7 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
 
 		final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
 		for (int i = numberOfCalls - 1; i >= 0; --i) {
-			expectedResult.add(new Tuple1<Integer>(i));
+			expectedResult.add(new Tuple1<Integer>(new Integer(i)));
 		}
 
 		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
@@ -68,7 +71,7 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
 				spout);
 		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
 
-		final TestCollector collector = new TestCollector();
+		final TestContext collector = new TestContext();
 		spoutWrapper.run(collector);
 
 		Assert.assertEquals(expectedResult, collector.result);
@@ -79,7 +82,7 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
 		final int numberOfCalls = 5 + this.r.nextInt(5);
 
 		final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
-		expectedResult.add(new Tuple1<Integer>(numberOfCalls - 1));
+		expectedResult.add(new Tuple1<Integer>(new Integer(numberOfCalls - 1)));
 
 		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
 		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
@@ -87,7 +90,7 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
 		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
 
 		spoutWrapper.cancel();
-		final TestCollector collector = new TestCollector();
+		final TestContext collector = new TestContext();
 		spoutWrapper.run(collector);
 
 		Assert.assertEquals(expectedResult, collector.result);

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
index a72eb19..cfde770 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
@@ -42,7 +42,7 @@ public class StormOutputFieldsDeclarerTest extends AbstractTest {
 		Assert.assertEquals(numberOfAttributes, declarer.getNumberOfAttributes());
 	}
 
-	@SuppressWarnings("unused")
+	@Test
 	public void testDeclareDirect() {
 		new StormOutputFieldsDeclarer().declare(false, null);
 	}
@@ -52,7 +52,7 @@ public class StormOutputFieldsDeclarerTest extends AbstractTest {
 		new StormOutputFieldsDeclarer().declare(true, null);
 	}
 
-	@SuppressWarnings("unused")
+	@Test
 	public void testDeclareStream() {
 		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, null);
 	}
@@ -62,7 +62,7 @@ public class StormOutputFieldsDeclarerTest extends AbstractTest {
 		new StormOutputFieldsDeclarer().declareStream(null, null);
 	}
 
-	@SuppressWarnings("unused")
+	@Test
 	public void testDeclareFullStream() {
 		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, null);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e39699f3/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
new file mode 100644
index 0000000..e4826bb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.tuple.Values;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class StormSpoutCollectorTest extends AbstractTest {
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testSpoutStormCollector() throws InstantiationException, IllegalAccessException {
+		for (int numberOfAttributes = 0; numberOfAttributes < 26; ++numberOfAttributes) {
+			final SourceContext flinkCollector = mock(SourceContext.class);
+			Tuple flinkTuple = null;
+			final Values tuple = new Values();
+
+			StormSpoutCollector<?> collector;
+
+			if (numberOfAttributes == 0) {
+				collector = new StormSpoutCollector(numberOfAttributes, flinkCollector);
+				tuple.add(new Integer(this.r.nextInt()));
+
+			} else {
+				collector = new StormSpoutCollector(numberOfAttributes, flinkCollector);
+				flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+
+				for (int i = 0; i < numberOfAttributes; ++i) {
+					tuple.add(new Integer(this.r.nextInt()));
+					flinkTuple.setField(tuple.get(i), i);
+				}
+			}
+
+			final String streamId = "streamId";
+			final List<Integer> taskIds;
+			final Object messageId = new Integer(this.r.nextInt());
+
+			taskIds = collector.emit(streamId, tuple, messageId);
+
+			Assert.assertNull(taskIds);
+
+			if (numberOfAttributes == 0) {
+				verify(flinkCollector).collect(tuple.get(0));
+			} else {
+				verify(flinkCollector).collect(flinkTuple);
+			}
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testReportError() {
+		new StormSpoutCollector<Object>(1, mock(SourceContext.class)).reportError(null);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testEmitDirect() {
+		new StormSpoutCollector<Object>(1, mock(SourceContext.class)).emitDirect(0, null, null,
+				(Object) null);
+	}
+
+}