You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:39 UTC

[14/15] flink git commit: [FLINK-2566] FlinkTopologyContext not populated completely - extended FlinkTopologyContext to be populted with all supportable attributes - added JUnit test - updated README.md additionally: module restructuring to get cle

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java
new file mode 100644
index 0000000..1891873
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+public class FiniteTestSpout implements IRichSpout {
+	private static final long serialVersionUID = 7992419478267824279L;
+
+	private int numberOfOutputTuples;
+	private SpoutOutputCollector collector;
+
+	public FiniteTestSpout(final int numberOfOutputTuples) {
+		this.numberOfOutputTuples = numberOfOutputTuples;
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+		this.collector = collector;
+	}
+
+	@Override
+	public void close() {/* nothing to do */}
+
+	@Override
+	public void activate() {/* nothing to do */}
+
+	@Override
+	public void deactivate() {/* nothing to do */}
+
+	@Override
+	public void nextTuple() {
+		if (--this.numberOfOutputTuples >= 0) {
+			this.collector.emit(new Values(new Integer(this.numberOfOutputTuples)));
+		}
+	}
+
+	@Override
+	public void ack(final Object msgId) {/* nothing to do */}
+
+	@Override
+	public void fail(final Object msgId) {/* nothing to do */}
+
+	@Override
+	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("dummy"));
+	}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java
new file mode 100644
index 0000000..8e63563
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.stormcompatibility.util.FlinkOutputFieldsDeclarer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.LinkedList;
+
+public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
+
+
+
+	@Test
+	public void testNull() {
+		Assert.assertNull(new FlinkOutputFieldsDeclarer().getOutputType(null));
+	}
+
+	@Test
+	public void testDeclare() {
+		for (int i = 0; i < 2; ++i) { // test case: simple / non-direct
+			for (int j = 1; j < 2; ++j) { // number of streams
+				for (int k = 0; k <= 25; ++k) { // number of attributes
+					this.runDeclareTest(i, j, k);
+				}
+			}
+		}
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDeclareSimpleToManyAttributes() {
+		this.runDeclareTest(0, this.r.nextBoolean() ? 1 : 2, 26);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDeclareNonDirectToManyAttributes() {
+		this.runDeclareTest(1, this.r.nextBoolean() ? 1 : 2, 26);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDeclareDefaultStreamToManyAttributes() {
+		this.runDeclareTest(2, this.r.nextBoolean() ? 1 : 2, 26);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDeclareFullToManyAttributes() {
+		this.runDeclareTest(3, this.r.nextBoolean() ? 1 : 2, 26);
+	}
+
+	private void runDeclareTest(final int testCase, final int numberOfStreams,
+			final int numberOfAttributes) {
+		final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+
+		String[] streams = null;
+		if (numberOfStreams > 1 || r.nextBoolean()) {
+			streams = new String[numberOfStreams];
+			for (int i = 0; i < numberOfStreams; ++i) {
+				streams[i] = "stream" + i;
+			}
+		}
+
+		final String[] attributes = new String[numberOfAttributes];
+		for (int i = 0; i < attributes.length; ++i) {
+			attributes[i] = "a" + i;
+		}
+
+		switch (testCase) {
+		case 0:
+			this.declareSimple(declarer, streams, attributes);
+			break;
+		default:
+			this.declareNonDirect(declarer, streams, attributes);
+		}
+
+		if (streams == null) {
+			streams = new String[] { Utils.DEFAULT_STREAM_ID };
+		}
+
+		for (String stream : streams) {
+			final TypeInformation<?> type = declarer.getOutputType(stream);
+
+			if (numberOfAttributes == 1) {
+				Assert.assertEquals(type.getClass(), GenericTypeInfo.class);
+				Assert.assertEquals(type.getTypeClass(), Object.class);
+			} else {
+				Assert.assertEquals(numberOfAttributes, type.getArity());
+				Assert.assertTrue(type.isTupleType());
+			}
+		}
+	}
+
+	private void declareSimple(final FlinkOutputFieldsDeclarer declarer, final String[] streams,
+			final String[] attributes) {
+
+		if (streams != null) {
+			for (String stream : streams) {
+				declarer.declareStream(stream, new Fields(attributes));
+			}
+		} else {
+			declarer.declare(new Fields(attributes));
+		}
+	}
+
+	private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, final String[] streams,
+			final String[] attributes) {
+
+		if (streams != null) {
+			for (String stream : streams) {
+				declarer.declareStream(stream, false, new Fields(attributes));
+			}
+		} else {
+			declarer.declare(false, new Fields(attributes));
+		}
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testUndeclared() {
+		final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+		declarer.getOutputType("unknownStreamId");
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareDirect() {
+		new FlinkOutputFieldsDeclarer().declare(true, null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareDirect2() {
+		new FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
+	}
+
+	@Test
+	public void testGetGroupingFieldIndexes() {
+		final int numberOfAttributes = 5 + this.r.nextInt(21);
+		final String[] attributes = new String[numberOfAttributes];
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			attributes[i] = "a" + i;
+		}
+
+		final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+		declarer.declare(new Fields(attributes));
+
+		final int numberOfKeys = 1 + this.r.nextInt(25);
+		final LinkedList<String> groupingFields = new LinkedList<String>();
+		final boolean[] indexes = new boolean[numberOfAttributes];
+
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			if (this.r.nextInt(26) < numberOfKeys) {
+				groupingFields.add(attributes[i]);
+				indexes[i] = true;
+			} else {
+				indexes[i] = false;
+			}
+		}
+
+		final int[] expectedResult = new int[groupingFields.size()];
+		int j = 0;
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			if (indexes[i]) {
+				expectedResult[j++] = i;
+			}
+		}
+
+		final int[] result = declarer.getGroupingFieldIndexes(Utils.DEFAULT_STREAM_ID,
+				groupingFields);
+
+		Assert.assertEquals(expectedResult.length, result.length);
+		for (int i = 0; i < expectedResult.length; ++i) {
+			Assert.assertEquals(expectedResult[i], result[i]);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java
new file mode 100644
index 0000000..c3cb7d7
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.Iterator;
+
+import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FlinkStormStreamSelectorTest {
+
+	@Test
+	public void testSelector() {
+		FlinkStormStreamSelector<Object> selector = new FlinkStormStreamSelector<Object>();
+		SplitStreamType<Object> tuple = new SplitStreamType<Object>();
+		Iterator<String> result;
+
+		tuple.streamId = "stream1";
+		result = selector.select(tuple).iterator();
+		Assert.assertEquals("stream1", result.next());
+		Assert.assertFalse(result.hasNext());
+
+		tuple.streamId = "stream2";
+		result = selector.select(tuple).iterator();
+		Assert.assertEquals("stream2", result.next());
+		Assert.assertFalse(result.hasNext());
+
+		tuple.streamId = "stream1";
+		result = selector.select(tuple).iterator();
+		Assert.assertEquals("stream1", result.next());
+		Assert.assertFalse(result.hasNext());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java
new file mode 100644
index 0000000..bd9ea3f
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.HashMap;
+
+import backtype.storm.generated.Bolt;
+import backtype.storm.generated.SpoutSpec;
+import backtype.storm.generated.StateSpoutSpec;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+
+import org.apache.flink.stormcompatibility.util.FlinkTopologyContext;
+import org.junit.Test;
+
+
+/*
+ * FlinkTopologyContext.getSources(componentId) and FlinkTopologyContext.getTargets(componentId) are not tested here,
+ * because those are tested in StormWrapperSetupHelperTest.
+ */
+public class FlinkTopologyContextTest extends AbstractTest {
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testAddTaskHook() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.addTaskHook(null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetHooks() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.getHooks();
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testRegisteredMetric1() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.registerMetric(null, (ICombiner) null, 0);
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testRegisteredMetric2() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.registerMetric(null, (IReducer) null, 0);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testRegisteredMetric3() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.registerMetric(null, (IMetric) null, 0);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetRegisteredMetricByName() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.getRegisteredMetricByName(null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testSetAllSubscribedState() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.setAllSubscribedState(null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testSetSubscribedState1() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.setSubscribedState(null, null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testSetSubscribedState2() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.setSubscribedState(null, null, null);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
index ec48719..b499373 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
@@ -24,6 +24,7 @@ import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
 
 public class TestDummyBolt implements IRichBolt {
 	private static final long serialVersionUID = 6893611247443121322L;
@@ -31,12 +32,27 @@ public class TestDummyBolt implements IRichBolt {
 	public final static String shuffleStreamId = "shuffleStream";
 	public final static String groupingStreamId = "groupingStream";
 
+	private boolean emit = true;
+	private TopologyContext context;
+	private OutputCollector collector;
+
 	@SuppressWarnings("rawtypes")
 	@Override
-	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}
+	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+		this.context = context;
+		this.collector = collector;
+	}
 
 	@Override
-	public void execute(Tuple input) {}
+	public void execute(Tuple input) {
+		if (this.context.getThisTaskIndex() == 0) {
+			this.collector.emit(shuffleStreamId, input.getValues());
+		}
+		if (this.emit) {
+			this.collector.emit(groupingStreamId, new Values("bolt", this.context));
+			this.emit = false;
+		}
+	}
 
 	@Override
 	public void cleanup() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
index 62705b8..345ca12 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
@@ -23,6 +23,7 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
 
 public class TestDummySpout implements IRichSpout {
@@ -30,9 +31,16 @@ public class TestDummySpout implements IRichSpout {
 
 	public final static String spoutStreamId = "spout-stream";
 
+	private boolean emit = true;
+	private TopologyContext context;
+	private SpoutOutputCollector collector;
+
 	@SuppressWarnings("rawtypes")
 	@Override
-	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
+	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+		this.context = context;
+		this.collector = collector;
+	}
 
 	@Override
 	public void close() {}
@@ -44,7 +52,12 @@ public class TestDummySpout implements IRichSpout {
 	public void deactivate() {}
 
 	@Override
-	public void nextTuple() {}
+	public void nextTuple() {
+		if (this.emit) {
+			this.collector.emit(new Values(this.context));
+			this.emit = false;
+		}
+	}
 
 	@Override
 	public void ack(Object msgId) {}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
index 5699219..c8e5584 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
@@ -16,6 +16,8 @@
  */
 package org.apache.flink.stormcompatibility.util;
 
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import backtype.storm.task.OutputCollector;
@@ -27,12 +29,22 @@ import backtype.storm.tuple.Tuple;
 public class TestSink implements IRichBolt {
 	private static final long serialVersionUID = 4314871456719370877L;
 
+	public final static List<TopologyContext> result = new LinkedList<TopologyContext>();
+
 	@SuppressWarnings("rawtypes")
 	@Override
-	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}
+	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+		result.add(context);
+	}
 
 	@Override
-	public void execute(Tuple input) {}
+	public void execute(Tuple input) {
+		if (input.size() == 1) {
+			result.add((TopologyContext) input.getValue(0));
+		} else {
+			result.add((TopologyContext) input.getValue(1));
+		}
+	}
 
 	@Override
 	public void cleanup() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
index 381e130..b44e8a1 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
@@ -18,6 +18,8 @@
 package org.apache.flink.stormcompatibility.wrappers;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.junit.Test;
@@ -43,6 +45,8 @@ public class FiniteStormSpoutWrapperTest {
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
 
 		final FiniteStormSpoutWrapper<?> wrapper = new FiniteStormSpoutWrapper<Object>(stormSpout);
 		wrapper.setRuntimeContext(taskContext);
@@ -59,6 +63,8 @@ public class FiniteStormSpoutWrapperTest {
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
 
 		final FiniteStormSpoutWrapper<?> wrapper = new FiniteStormSpoutWrapper<Object>(stormSpout);
 		wrapper.setRuntimeContext(taskContext);

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
deleted file mode 100644
index eef35cf..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-import java.util.Map;
-
-class FiniteTestSpout implements IRichSpout {
-	private static final long serialVersionUID = 7992419478267824279L;
-
-	private int numberOfOutputTuples;
-	private SpoutOutputCollector collector;
-
-	public FiniteTestSpout(final int numberOfOutputTuples) {
-		this.numberOfOutputTuples = numberOfOutputTuples;
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
-		this.collector = collector;
-	}
-
-	@Override
-	public void close() {/* nothing to do */}
-
-	@Override
-	public void activate() {/* nothing to do */}
-
-	@Override
-	public void deactivate() {/* nothing to do */}
-
-	@Override
-	public void nextTuple() {
-		if (--this.numberOfOutputTuples >= 0) {
-			this.collector.emit(new Values(new Integer(this.numberOfOutputTuples)));
-		}
-	}
-
-	@Override
-	public void ack(final Object msgId) {/* nothing to do */}
-
-	@Override
-	public void fail(final Object msgId) {/* nothing to do */}
-
-	@Override
-	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("dummy"));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
deleted file mode 100644
index c0a6ed3..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Iterator;
-
-import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class FlinkStormStreamSelectorTest {
-
-	@Test
-	public void testSelector() {
-		FlinkStormStreamSelector<Object> selector = new FlinkStormStreamSelector<Object>();
-		SplitStreamType<Object> tuple = new SplitStreamType<Object>();
-		Iterator<String> result;
-
-		tuple.streamId = "stream1";
-		result = selector.select(tuple).iterator();
-		Assert.assertEquals("stream1", result.next());
-		Assert.assertFalse(result.hasNext());
-
-		tuple.streamId = "stream2";
-		result = selector.select(tuple).iterator();
-		Assert.assertEquals("stream2", result.next());
-		Assert.assertFalse(result.hasNext());
-
-		tuple.streamId = "stream1";
-		result = selector.select(tuple).iterator();
-		Assert.assertEquals("stream1", result.next());
-		Assert.assertFalse(result.hasNext());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java
new file mode 100644
index 0000000..738eb1e
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+public class SetupOutputFieldsDeclarerTest extends AbstractTest {
+
+	@Test
+	public void testDeclare() {
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+
+		int numberOfAttributes = this.r.nextInt(26);
+		declarer.declare(createSchema(numberOfAttributes));
+		Assert.assertEquals(1, declarer.outputSchemas.size());
+		Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(Utils.DEFAULT_STREAM_ID)
+				.intValue());
+
+		final String sid = "streamId";
+		numberOfAttributes = this.r.nextInt(26);
+		declarer.declareStream(sid, createSchema(numberOfAttributes));
+		Assert.assertEquals(2, declarer.outputSchemas.size());
+		Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(sid).intValue());
+	}
+
+	private Fields createSchema(final int numberOfAttributes) {
+		final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes);
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			schema.add("a" + i);
+		}
+		return new Fields(schema);
+	}
+
+	@Test
+	public void testDeclareDirect() {
+		new SetupOutputFieldsDeclarer().declare(false, new Fields());
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareDirectFail() {
+		new SetupOutputFieldsDeclarer().declare(true, new Fields());
+	}
+
+	@Test
+	public void testDeclareStream() {
+		new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, new Fields());
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDeclareStreamFail() {
+		new SetupOutputFieldsDeclarer().declareStream(null, new Fields());
+	}
+
+	@Test
+	public void testDeclareFullStream() {
+		new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields());
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDeclareFullStreamFailNonDefaultStream() {
+		new SetupOutputFieldsDeclarer().declareStream(null, false, new Fields());
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareFullStreamFailDirect() {
+		new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, new Fields());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
index 5cfb151..6817593 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -61,9 +61,9 @@ public class StormBoltWrapperTest extends AbstractTest {
 
 	@Test(expected = IllegalArgumentException.class)
 	public void testWrapperRawType() throws Exception {
-		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
 		declarer.declare(new Fields("dummy1", "dummy2"));
-		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class),
 				new String[] { Utils.DEFAULT_STREAM_ID });
@@ -71,26 +71,26 @@ public class StormBoltWrapperTest extends AbstractTest {
 
 	@Test(expected = IllegalArgumentException.class)
 	public void testWrapperToManyAttributes1() throws Exception {
-		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
 		final String[] schema = new String[26];
 		for (int i = 0; i < schema.length; ++i) {
 			schema[i] = "a" + i;
 		}
 		declarer.declare(new Fields(schema));
-		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class));
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void testWrapperToManyAttributes2() throws Exception {
-		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
 		final String[] schema = new String[26];
 		for (int i = 0; i < schema.length; ++i) {
 			schema[i] = "a" + i;
 		}
 		declarer.declare(new Fields(schema));
-		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), new String[] {});
 	}
@@ -133,12 +133,14 @@ public class StormBoltWrapperTest extends AbstractTest {
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
 
 		final IRichBolt bolt = mock(IRichBolt.class);
 
-		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
 		declarer.declare(new Fields(schema));
-		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, (Fields) null);
 		wrapper.setup(mock(Output.class), taskContext);
@@ -163,6 +165,8 @@ public class StormBoltWrapperTest extends AbstractTest {
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
 
 		final Output output = mock(Output.class);
 
@@ -209,14 +213,17 @@ public class StormBoltWrapperTest extends AbstractTest {
 
 		final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
 		when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
-		.thenReturn(flinkConfig);
+				.thenReturn(flinkConfig);
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
 
-		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
 		declarer.declare(new Fields("dummy"));
-		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
 
 		final IRichBolt bolt = mock(IRichBolt.class);
 		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
@@ -249,8 +256,11 @@ public class StormBoltWrapperTest extends AbstractTest {
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
 
 		final IRichBolt bolt = mock(IRichBolt.class);
+
 		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
 		wrapper.setup(mock(Output.class), taskContext);
 
@@ -275,9 +285,9 @@ public class StormBoltWrapperTest extends AbstractTest {
 	public void testClose() throws Exception {
 		final IRichBolt bolt = mock(IRichBolt.class);
 
-		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
 		declarer.declare(new Fields("dummy"));
-		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
index a4eea7e..77f1b05 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
@@ -22,7 +22,9 @@ import backtype.storm.tuple.Fields;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.stormcompatibility.util.FiniteTestSpout;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.junit.Assert;
@@ -46,12 +48,14 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
 	@SuppressWarnings("unchecked")
 	@Test
 	public void testRunExecuteFixedNumber() throws Exception {
-		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
 		declarer.declare(new Fields("dummy"));
-		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
 
 		final IRichSpout spout = mock(IRichSpout.class);
 		final int numberOfCalls = this.r.nextInt(50);
@@ -73,6 +77,8 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
 
 		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
 		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
@@ -94,11 +100,12 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
 
 		StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
 
 		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
 		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
 				spout);
-		spoutWrapper.setRuntimeContext(taskContext);
 
 		spoutWrapper.cancel();
 		final TestContext collector = new TestContext();

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
deleted file mode 100644
index 561939f..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-
-public class StormOutputFieldsDeclarerTest extends AbstractTest {
-
-	@Test
-	public void testDeclare() {
-		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
-
-		int numberOfAttributes = this.r.nextInt(26);
-		declarer.declare(createSchema(numberOfAttributes));
-		Assert.assertEquals(1, declarer.outputSchemas.size());
-		Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(Utils.DEFAULT_STREAM_ID)
-				.intValue());
-
-		final String sid = "streamId";
-		numberOfAttributes = this.r.nextInt(26);
-		declarer.declareStream(sid, createSchema(numberOfAttributes));
-		Assert.assertEquals(2, declarer.outputSchemas.size());
-		Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(sid).intValue());
-	}
-
-	private Fields createSchema(final int numberOfAttributes) {
-		final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes);
-		for (int i = 0; i < numberOfAttributes; ++i) {
-			schema.add("a" + i);
-		}
-		return new Fields(schema);
-	}
-
-	@Test
-	public void testDeclareDirect() {
-		new StormOutputFieldsDeclarer().declare(false, new Fields());
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testDeclareDirectFail() {
-		new StormOutputFieldsDeclarer().declare(true, new Fields());
-	}
-
-	@Test
-	public void testDeclareStream() {
-		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, new Fields());
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testDeclareStreamFail() {
-		new StormOutputFieldsDeclarer().declareStream(null, new Fields());
-	}
-
-	@Test
-	public void testDeclareFullStream() {
-		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields());
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testDeclareFullStreamFailNonDefaultStream() {
-		new StormOutputFieldsDeclarer().declareStream(null, false, new Fields());
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testDeclareFullStreamFailDirect() {
-		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, new Fields());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
index 04dc48d..f4fb4da 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
@@ -25,9 +25,11 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.stormcompatibility.util.FiniteTestSpout;
 import org.apache.flink.stormcompatibility.util.StormConfig;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -89,8 +91,12 @@ public class StormSpoutWrapperTest extends AbstractTest {
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
 
 		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
+
+
 		final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
 		spoutWrapper.setRuntimeContext(taskContext);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
index 7497ffc..c799d63 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
@@ -14,29 +14,46 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.stormcompatibility.wrappers;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IComponent;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
 
+import org.apache.flink.stormcompatibility.api.TestTopologyBuilder;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.stormcompatibility.util.TestDummyBolt;
+import org.apache.flink.stormcompatibility.util.TestDummySpout;
+import org.apache.flink.stormcompatibility.util.TestSink;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import com.google.common.collect.Sets;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+@PowerMockIgnore("javax.*")
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(StormWrapperSetupHelper.class)
 public class StormWrapperSetupHelperTest extends AbstractTest {
@@ -65,9 +82,9 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
 			boltOrSpout = mock(IRichBolt.class);
 		}
 
-		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
 		declarer.declare(new Fields("dummy1", "dummy2"));
-		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
 				Sets.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }));
@@ -83,13 +100,13 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
 			boltOrSpout = mock(IRichBolt.class);
 		}
 
-		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
 		final String[] schema = new String[26];
 		for (int i = 0; i < schema.length; ++i) {
 			schema[i] = "a" + i;
 		}
 		declarer.declare(new Fields(schema));
-		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, null);
 	}
@@ -119,9 +136,9 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
 			boltOrSpout = mock(IRichBolt.class);
 		}
 
-		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
 		declarer.declare(new Fields(schema));
-		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		HashMap<String, Integer> attributes = new HashMap<String, Integer>();
 		attributes.put(Utils.DEFAULT_STREAM_ID, numberOfAttributes);
@@ -132,4 +149,167 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
 						.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }) : null));
 	}
 
+	@Test
+	public void testCreateTopologyContext() {
+		HashMap<String, Integer> dops = new HashMap<String, Integer>();
+		dops.put("spout1", 1);
+		dops.put("spout2", 3);
+		dops.put("bolt1", 1);
+		dops.put("bolt2", 2);
+		dops.put("sink", 1);
+
+		HashMap<String, Integer> taskCounter = new HashMap<String, Integer>();
+		taskCounter.put("spout1", 0);
+		taskCounter.put("spout2", 0);
+		taskCounter.put("bolt1", 0);
+		taskCounter.put("bolt2", 0);
+		taskCounter.put("sink", 0);
+
+		HashMap<String, IComponent> operators = new HashMap<String, IComponent>();
+		operators.put("spout1", new TestDummySpout());
+		operators.put("spout2", new TestDummySpout());
+		operators.put("bolt1", new TestDummyBolt());
+		operators.put("bolt2", new TestDummyBolt());
+		operators.put("sink", new TestSink());
+
+		TopologyBuilder builder = new TopologyBuilder();
+
+		builder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1"));
+		builder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2"));
+		builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
+		builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
+		builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
+		.fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
+		.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
+		.fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
+		.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
+
+		final int maxRetry = 3;
+		int counter;
+		for (counter = 0; counter < maxRetry; ++counter) {
+			LocalCluster cluster = new LocalCluster();
+			Config c = new Config();
+			c.setNumAckers(0);
+			cluster.submitTopology("test", c, builder.createTopology());
+			Utils.sleep((counter + 1) * 5000);
+			cluster.shutdown();
+
+			if (TestSink.result.size() == 8) {
+				break;
+			}
+		}
+		Assert.assertTrue(counter < maxRetry);
+
+		TestTopologyBuilder flinkBuilder = new TestTopologyBuilder();
+
+		flinkBuilder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1"));
+		flinkBuilder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2"));
+		flinkBuilder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
+		flinkBuilder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
+		flinkBuilder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
+		.fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
+		.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
+		.fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
+		.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
+
+		flinkBuilder.createTopology();
+		StormTopology stormTopology = flinkBuilder.getStormTopology();
+
+		Set<Integer> taskIds = new HashSet<Integer>();
+
+		for (TopologyContext expectedContext : TestSink.result) {
+			final String thisComponentId = expectedContext.getThisComponentId();
+			int index = taskCounter.get(thisComponentId);
+
+			StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+			when(context.getTaskName()).thenReturn(thisComponentId);
+			when(context.getNumberOfParallelSubtasks()).thenReturn(dops.get(thisComponentId));
+			when(context.getIndexOfThisSubtask()).thenReturn(index);
+			taskCounter.put(thisComponentId, ++index);
+
+			Config stormConfig = new Config();
+			stormConfig.put(StormWrapperSetupHelper.TOPOLOGY_NAME, "test");
+
+			TopologyContext topologyContext = StormWrapperSetupHelper.createTopologyContext(
+					context, operators.get(thisComponentId), stormTopology, stormConfig);
+
+			ComponentCommon expcetedCommon = expectedContext.getComponentCommon(thisComponentId);
+			ComponentCommon common = topologyContext.getComponentCommon(thisComponentId);
+
+			Assert.assertNull(topologyContext.getCodeDir());
+			Assert.assertNull(common.get_json_conf());
+			Assert.assertNull(topologyContext.getExecutorData(null));
+			Assert.assertNull(topologyContext.getPIDDir());
+			Assert.assertNull(topologyContext.getResource(null));
+			Assert.assertNull(topologyContext.getSharedExecutor());
+			Assert.assertNull(expectedContext.getTaskData(null));
+			Assert.assertNull(topologyContext.getThisWorkerPort());
+
+			Assert.assertTrue(expectedContext.getStormId().startsWith(topologyContext.getStormId()));
+
+			Assert.assertEquals(expcetedCommon.get_inputs(), common.get_inputs());
+			Assert.assertEquals(expcetedCommon.get_parallelism_hint(), common.get_parallelism_hint());
+			Assert.assertEquals(expcetedCommon.get_streams(), common.get_streams());
+			Assert.assertEquals(expectedContext.getComponentIds(), topologyContext.getComponentIds());
+			Assert.assertEquals(expectedContext.getComponentStreams(thisComponentId),
+					topologyContext.getComponentStreams(thisComponentId));
+			Assert.assertEquals(thisComponentId, topologyContext.getThisComponentId());
+			Assert.assertEquals(expectedContext.getThisSources(), topologyContext.getThisSources());
+			Assert.assertEquals(expectedContext.getThisStreams(), topologyContext.getThisStreams());
+			Assert.assertEquals(expectedContext.getThisTargets(), topologyContext.getThisTargets());
+			Assert.assertEquals(0, topologyContext.getThisWorkerTasks().size());
+
+			for (int taskId : topologyContext.getComponentTasks(thisComponentId)) {
+				Assert.assertEquals(thisComponentId, topologyContext.getComponentId(taskId));
+			}
+
+			for (String componentId : expectedContext.getComponentIds()) {
+				Assert.assertEquals(expectedContext.getSources(componentId),
+						topologyContext.getSources(componentId));
+				Assert.assertEquals(expectedContext.getTargets(componentId),
+						topologyContext.getTargets(componentId));
+
+				for (String streamId : expectedContext.getComponentStreams(componentId)) {
+					Assert.assertEquals(
+							expectedContext.getComponentOutputFields(componentId, streamId).toList(),
+							topologyContext.getComponentOutputFields(componentId, streamId).toList());
+				}
+			}
+
+			for (String streamId : expectedContext.getThisStreams()) {
+				Assert.assertEquals(expectedContext.getThisOutputFields(streamId).toList(),
+						topologyContext.getThisOutputFields(streamId).toList());
+			}
+
+			HashMap<Integer, String> taskToComponents = new HashMap<Integer, String>();
+			Set<Integer> allTaskIds = new HashSet<Integer>();
+			for (String componentId : expectedContext.getComponentIds()) {
+				List<Integer> possibleTasks = expectedContext.getComponentTasks(componentId);
+				List<Integer> tasks = topologyContext.getComponentTasks(componentId);
+
+				Iterator<Integer> p_it = possibleTasks.iterator();
+				Iterator<Integer> t_it = tasks.iterator();
+				while(p_it.hasNext()) {
+					Assert.assertTrue(t_it.hasNext());
+					Assert.assertNull(taskToComponents.put(p_it.next(), componentId));
+					Assert.assertTrue(allTaskIds.add(t_it.next()));
+				}
+				Assert.assertFalse(t_it.hasNext());
+			}
+
+			Assert.assertEquals(taskToComponents, expectedContext.getTaskToComponent());
+			Assert.assertTrue(taskIds.add(topologyContext.getThisTaskId()));
+
+			try {
+				topologyContext.getHooks();
+				Assert.fail();
+			} catch (UnsupportedOperationException e) { /* expected */ }
+
+			try {
+				topologyContext.getRegisteredMetricByName(null);
+				Assert.fail();
+			} catch (UnsupportedOperationException e) { /* expected */ }
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
index 64b3e28..d3776fb 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
@@ -20,7 +20,7 @@ import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Values;
 
-import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
+import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
 
 import java.io.IOException;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
index 6fb764d..5efff66 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.stormcompatibility.util;
 
-import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
+import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
 
 /**
  * Implements a Storm Spout that reads String[] data stored in the memory. The spout stops

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
index c992b6b..5f637d3 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
@@ -49,10 +49,10 @@ public class SplitBoltTopology {
 			final String[] tokens = outputPath.split(":");
 			final String outputFile = tokens[tokens.length - 1];
 			builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter))
-			.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
+					.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
 		} else {
 			builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4)
-			.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
+					.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
 		}
 
 		return builder;