You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:27 UTC

[02/15] flink git commit: [Storm Compatibility] Maven module restucturing and cleanup - removed storm-parent; renamed storm-core and storm-examples - updated internal Java package structure * renamed package "stormcompatibility" to "storm" *

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java
new file mode 100644
index 0000000..f51aba4
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public abstract class AbstractTest {
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractTest.class);
+
+	protected long seed;
+	protected Random r;
+
+	@Before
+	public void prepare() {
+		this.seed = System.currentTimeMillis();
+		this.r = new Random(this.seed);
+		LOG.info("Test seed: {}", new Long(this.seed));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
new file mode 100644
index 0000000..1b320e5
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+public class FiniteTestSpout implements IRichSpout {
+	private static final long serialVersionUID = 7992419478267824279L;
+
+	private int numberOfOutputTuples;
+	private SpoutOutputCollector collector;
+
+	public FiniteTestSpout(final int numberOfOutputTuples) {
+		this.numberOfOutputTuples = numberOfOutputTuples;
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+		this.collector = collector;
+	}
+
+	@Override
+	public void close() {/* nothing to do */}
+
+	@Override
+	public void activate() {/* nothing to do */}
+
+	@Override
+	public void deactivate() {/* nothing to do */}
+
+	@Override
+	public void nextTuple() {
+		if (--this.numberOfOutputTuples >= 0) {
+			this.collector.emit(new Values(new Integer(this.numberOfOutputTuples)));
+		}
+	}
+
+	@Override
+	public void ack(final Object msgId) {/* nothing to do */}
+
+	@Override
+	public void fail(final Object msgId) {/* nothing to do */}
+
+	@Override
+	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("dummy"));
+	}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java
new file mode 100644
index 0000000..17de427
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.Iterator;
+
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StormStreamSelectorTest {
+
+	@Test
+	public void testSelector() {
+		StormStreamSelector<Object> selector = new StormStreamSelector<Object>();
+		SplitStreamType<Object> tuple = new SplitStreamType<Object>();
+		Iterator<String> result;
+
+		tuple.streamId = "stream1";
+		result = selector.select(tuple).iterator();
+		Assert.assertEquals("stream1", result.next());
+		Assert.assertFalse(result.hasNext());
+
+		tuple.streamId = "stream2";
+		result = selector.select(tuple).iterator();
+		Assert.assertEquals("stream2", result.next());
+		Assert.assertFalse(result.hasNext());
+
+		tuple.streamId = "stream1";
+		result = selector.select(tuple).iterator();
+		Assert.assertEquals("stream1", result.next());
+		Assert.assertFalse(result.hasNext());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
new file mode 100644
index 0000000..b7458df
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class TestDummyBolt implements IRichBolt {
+	private static final long serialVersionUID = 6893611247443121322L;
+
+	public final static String shuffleStreamId = "shuffleStream";
+	public final static String groupingStreamId = "groupingStream";
+
+	private boolean emit = true;
+	@SuppressWarnings("rawtypes")
+	public Map config;
+	private TopologyContext context;
+	private OutputCollector collector;
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+		this.config = stormConf;
+		this.context = context;
+		this.collector = collector;
+	}
+
+	@Override
+	public void execute(Tuple input) {
+		if (this.context.getThisTaskIndex() == 0) {
+			this.collector.emit(shuffleStreamId, input.getValues());
+		}
+		if (this.emit) {
+			this.collector.emit(groupingStreamId, new Values("bolt", this.context));
+			this.emit = false;
+		}
+	}
+
+	@Override
+	public void cleanup() {}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declareStream(shuffleStreamId, new Fields("data"));
+		declarer.declareStream(groupingStreamId, new Fields("id", "data"));
+	}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
new file mode 100644
index 0000000..ed9ffff
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.Map;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+public class TestDummySpout implements IRichSpout {
+	private static final long serialVersionUID = -5190945609124603118L;
+
+	public final static String spoutStreamId = "spout-stream";
+
+	private boolean emit = true;
+	@SuppressWarnings("rawtypes")
+	public Map config;
+	private TopologyContext context;
+	private SpoutOutputCollector collector;
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+		this.config = conf;
+		this.context = context;
+		this.collector = collector;
+	}
+
+	@Override
+	public void close() {}
+
+	@Override
+	public void activate() {}
+
+	@Override
+	public void deactivate() {}
+
+	@Override
+	public void nextTuple() {
+		if (this.emit) {
+			this.collector.emit(new Values(this.context));
+			this.emit = false;
+		}
+	}
+
+	@Override
+	public void ack(Object msgId) {}
+
+	@Override
+	public void fail(Object msgId) {}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declareStream(Utils.DEFAULT_STREAM_ID, new Fields("data"));
+		declarer.declareStream(spoutStreamId, new Fields("id", "data"));
+	}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
new file mode 100644
index 0000000..59939fd
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+
+public class TestSink implements IRichBolt {
+	private static final long serialVersionUID = 4314871456719370877L;
+
+	public final static List<TopologyContext> result = new LinkedList<TopologyContext>();
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+		result.add(context);
+	}
+
+	@Override
+	public void execute(Tuple input) {
+		if (input.size() == 1) {
+			result.add((TopologyContext) input.getValue(0));
+		} else {
+			result.add((TopologyContext) input.getValue(1));
+		}
+	}
+
+	@Override
+	public void cleanup() {}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
new file mode 100644
index 0000000..3d7d26b
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.tuple.Values;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.wrappers.BoltCollector;
+import org.apache.flink.streaming.api.operators.Output;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class BoltCollectorTest extends AbstractTest {
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testBoltStormCollector() throws InstantiationException, IllegalAccessException {
+		for (int numberOfAttributes = -1; numberOfAttributes < 26; ++numberOfAttributes) {
+			final Output flinkCollector = mock(Output.class);
+			Tuple flinkTuple = null;
+			final Values tuple = new Values();
+
+			BoltCollector<?> collector;
+
+			final String streamId = "streamId";
+			HashMap<String, Integer> attributes = new HashMap<String, Integer>();
+			attributes.put(streamId, numberOfAttributes);
+
+			if (numberOfAttributes == -1) {
+				collector = new BoltCollector(attributes, flinkCollector);
+				tuple.add(new Integer(this.r.nextInt()));
+
+			} else {
+				collector = new BoltCollector(attributes, flinkCollector);
+				flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+
+				for (int i = 0; i < numberOfAttributes; ++i) {
+					tuple.add(new Integer(this.r.nextInt()));
+					flinkTuple.setField(tuple.get(i), i);
+				}
+			}
+
+			final Collection anchors = mock(Collection.class);
+			final List<Integer> taskIds;
+			taskIds = collector.emit(streamId, anchors, tuple);
+
+			Assert.assertNull(taskIds);
+
+			if (numberOfAttributes == -1) {
+				verify(flinkCollector).collect(tuple.get(0));
+			} else {
+				verify(flinkCollector).collect(flinkTuple);
+			}
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testEmitDirect() {
+		new BoltCollector<Object>(mock(HashMap.class), mock(Output.class)).emitDirect(0, null,
+				null, null);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
new file mode 100644
index 0000000..e33fdb9
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormConfig;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
+import org.apache.flink.storm.wrappers.StormTuple;
+import org.apache.flink.storm.wrappers.WrapperSetupHelper;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({StreamRecordSerializer.class, WrapperSetupHelper.class})
+public class BoltWrapperTest extends AbstractTest {
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testWrapperRawType() throws Exception {
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+		declarer.declare(new Fields("dummy1", "dummy2"));
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+		new BoltWrapper<Object, Object>(mock(IRichBolt.class),
+				new String[] { Utils.DEFAULT_STREAM_ID });
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testWrapperToManyAttributes1() throws Exception {
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+		final String[] schema = new String[26];
+		for (int i = 0; i < schema.length; ++i) {
+			schema[i] = "a" + i;
+		}
+		declarer.declare(new Fields(schema));
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+		new BoltWrapper<Object, Object>(mock(IRichBolt.class));
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testWrapperToManyAttributes2() throws Exception {
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+		final String[] schema = new String[26];
+		for (int i = 0; i < schema.length; ++i) {
+			schema[i] = "a" + i;
+		}
+		declarer.declare(new Fields(schema));
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+		new BoltWrapper<Object, Object>(mock(IRichBolt.class), new String[] {});
+	}
+
+	@Test
+	public void testWrapper() throws Exception {
+		for (int i = -1; i < 26; ++i) {
+			this.testWrapper(i);
+		}
+	}
+
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	private void testWrapper(final int numberOfAttributes) throws Exception {
+		assert ((-1 <= numberOfAttributes) && (numberOfAttributes <= 25));
+		Tuple flinkTuple = null;
+		String rawTuple = null;
+
+		if (numberOfAttributes == -1) {
+			rawTuple = "test";
+		} else {
+			flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+		}
+
+		final String[] schema;
+		if (numberOfAttributes == -1) {
+			schema = new String[1];
+		} else {
+			schema = new String[numberOfAttributes];
+		}
+		for (int i = 0; i < schema.length; ++i) {
+			schema[i] = "a" + i;
+		}
+
+		final StreamRecord record = mock(StreamRecord.class);
+		if (numberOfAttributes == -1) {
+			when(record.getValue()).thenReturn(rawTuple);
+		} else {
+			when(record.getValue()).thenReturn(flinkTuple);
+		}
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
+
+		final IRichBolt bolt = mock(IRichBolt.class);
+
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+		declarer.declare(new Fields(schema));
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+		final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null);
+		wrapper.setup(mock(Output.class), taskContext);
+		wrapper.open(null);
+
+		wrapper.processElement(record);
+		if (numberOfAttributes == -1) {
+			verify(bolt).execute(eq(new StormTuple<String>(rawTuple, null)));
+		} else {
+			verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple, null)));
+		}
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testMultipleOutputStreams() throws Exception {
+		final boolean rawOutType1 = super.r.nextBoolean();
+		final boolean rawOutType2 = super.r.nextBoolean();
+
+		final StreamRecord record = mock(StreamRecord.class);
+		when(record.getValue()).thenReturn(2).thenReturn(3);
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
+
+		final Output output = mock(Output.class);
+
+		final TestBolt bolt = new TestBolt();
+		final HashSet<String> raw = new HashSet<String>();
+		if (rawOutType1) {
+			raw.add("stream1");
+		}
+		if (rawOutType2) {
+			raw.add("stream2");
+		}
+
+		final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null, raw);
+		wrapper.setup(output, taskContext);
+		wrapper.open(null);
+
+		final SplitStreamType splitRecord = new SplitStreamType<Integer>();
+		if (rawOutType1) {
+			splitRecord.streamId = "stream1";
+			splitRecord.value = 2;
+		} else {
+			splitRecord.streamId = "stream1";
+			splitRecord.value = new Tuple1<Integer>(2);
+		}
+		wrapper.processElement(record);
+		verify(output).collect(new StreamRecord<SplitStreamType>(splitRecord, 0));
+
+		if (rawOutType2) {
+			splitRecord.streamId = "stream2";
+			splitRecord.value = 3;
+		} else {
+			splitRecord.streamId = "stream2";
+			splitRecord.value = new Tuple1<Integer>(3);
+		}
+		wrapper.processElement(record);
+		verify(output, times(2)).collect(new StreamRecord<SplitStreamType>(splitRecord, 0));
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testOpen() throws Exception {
+		final StormConfig stormConfig = new StormConfig();
+		final Configuration flinkConfig = new Configuration();
+
+		final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
+		when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
+				.thenReturn(flinkConfig);
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
+
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+		declarer.declare(new Fields("dummy"));
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+		final IRichBolt bolt = mock(IRichBolt.class);
+
+		BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
+		wrapper.setup(mock(Output.class), taskContext);
+
+		// test without configuration
+		wrapper.open(null);
+		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
+
+		// test with StormConfig
+		wrapper.open(null);
+		verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
+				any(OutputCollector.class));
+
+		// test with Configuration
+		final TestDummyBolt testBolt = new TestDummyBolt();
+		wrapper = new BoltWrapper<Object, Object>(testBolt);
+		wrapper.setup(mock(Output.class), taskContext);
+
+		wrapper.open(null);
+		for (Entry<String, String> entry : flinkConfig.toMap().entrySet()) {
+			Assert.assertEquals(entry.getValue(), testBolt.config.get(entry.getKey()));
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testOpenSink() throws Exception {
+		final StormConfig stormConfig = new StormConfig();
+		final Configuration flinkConfig = new Configuration();
+
+		final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
+		when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
+				.thenReturn(flinkConfig);
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
+
+		final IRichBolt bolt = mock(IRichBolt.class);
+
+		BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
+		wrapper.setup(mock(Output.class), taskContext);
+
+		// test without configuration
+		wrapper.open(null);
+		verify(bolt).prepare(any(Map.class), any(TopologyContext.class),
+				isNull(OutputCollector.class));
+
+		// test with StormConfig
+		wrapper.open(null);
+		verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
+				isNull(OutputCollector.class));
+
+		// test with Configuration
+		final TestDummyBolt testBolt = new TestDummyBolt();
+		wrapper = new BoltWrapper<Object, Object>(testBolt);
+		wrapper.setup(mock(Output.class), taskContext);
+
+		wrapper.open(null);
+		for (Entry<String, String> entry : flinkConfig.toMap().entrySet()) {
+			Assert.assertEquals(entry.getValue(), testBolt.config.get(entry.getKey()));
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testClose() throws Exception {
+		final IRichBolt bolt = mock(IRichBolt.class);
+
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+		declarer.declare(new Fields("dummy"));
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+		final BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		wrapper.setup(mock(Output.class), taskContext);
+
+		wrapper.close();
+		wrapper.dispose();
+
+		verify(bolt).cleanup();
+	}
+
+	private static final class TestBolt implements IRichBolt {
+		private static final long serialVersionUID = 7278692872260138758L;
+		private OutputCollector collector;
+
+		@SuppressWarnings("rawtypes")
+		@Override
+		public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+			this.collector = collector;
+		}
+
+		int counter = 0;
+		@Override
+		public void execute(backtype.storm.tuple.Tuple input) {
+			if (++counter % 2 == 1) {
+				this.collector.emit("stream1", new Values(input.getInteger(0)));
+			} else {
+				this.collector.emit("stream2", new Values(input.getInteger(0)));
+			}
+		}
+
+		@Override
+		public void cleanup() {}
+
+		@Override
+		public void declareOutputFields(OutputFieldsDeclarer declarer) {
+			declarer.declareStream("stream1", new Fields("a1"));
+			declarer.declareStream("stream2", new Fields("a2"));
+		}
+
+		@Override
+		public Map<String, Object> getComponentConfiguration() {
+			return null;
+		}
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
new file mode 100644
index 0000000..69d4a8e
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import java.util.HashMap;
+
+import backtype.storm.generated.Bolt;
+import backtype.storm.generated.SpoutSpec;
+import backtype.storm.generated.StateSpoutSpec;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.wrappers.FlinkTopologyContext;
+import org.junit.Test;
+
+
+/*
+ * FlinkTopologyContext.getSources(componentId) and FlinkTopologyContext.getTargets(componentId) are not tested here,
+ * because those are tested in StormWrapperSetupHelperTest.
+ */
+public class FlinkTopologyContextTest extends AbstractTest {
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testAddTaskHook() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.addTaskHook(null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetHooks() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.getHooks();
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testRegisteredMetric1() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.registerMetric(null, (ICombiner) null, 0);
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testRegisteredMetric2() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.registerMetric(null, (IReducer) null, 0);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testRegisteredMetric3() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.registerMetric(null, (IMetric) null, 0);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetRegisteredMetricByName() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.getRegisteredMetricByName(null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testSetAllSubscribedState() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.setAllSubscribedState(null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testSetSubscribedState1() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.setSubscribedState(null, null);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testSetSubscribedState2() {
+		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+				null, null, null, null, null, null, null, null, null, null, null, null, null)
+		.setSubscribedState(null, null, null);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
new file mode 100644
index 0000000..4618101
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+public class SetupOutputFieldsDeclarerTest extends AbstractTest {
+
+	@Test
+	public void testDeclare() {
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+
+		int numberOfAttributes = this.r.nextInt(26);
+		declarer.declare(createSchema(numberOfAttributes));
+		Assert.assertEquals(1, declarer.outputSchemas.size());
+		Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(Utils.DEFAULT_STREAM_ID)
+				.intValue());
+
+		final String sid = "streamId";
+		numberOfAttributes = this.r.nextInt(26);
+		declarer.declareStream(sid, createSchema(numberOfAttributes));
+		Assert.assertEquals(2, declarer.outputSchemas.size());
+		Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(sid).intValue());
+	}
+
+	private Fields createSchema(final int numberOfAttributes) {
+		final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes);
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			schema.add("a" + i);
+		}
+		return new Fields(schema);
+	}
+
+	@Test
+	public void testDeclareDirect() {
+		new SetupOutputFieldsDeclarer().declare(false, new Fields());
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareDirectFail() {
+		new SetupOutputFieldsDeclarer().declare(true, new Fields());
+	}
+
+	@Test
+	public void testDeclareStream() {
+		new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, new Fields());
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDeclareStreamFail() {
+		new SetupOutputFieldsDeclarer().declareStream(null, new Fields());
+	}
+
+	@Test
+	public void testDeclareFullStream() {
+		new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields());
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDeclareFullStreamFailNonDefaultStream() {
+		new SetupOutputFieldsDeclarer().declareStream(null, false, new Fields());
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareFullStreamFailDirect() {
+		new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, new Fields());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
new file mode 100644
index 0000000..6b60d2b
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.tuple.Values;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.wrappers.SpoutCollector;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class SpoutCollectorTest extends AbstractTest {
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testSpoutStormCollector() throws InstantiationException, IllegalAccessException {
+		for (int numberOfAttributes = -1; numberOfAttributes < 26; ++numberOfAttributes) {
+			final SourceContext flinkCollector = mock(SourceContext.class);
+			Tuple flinkTuple = null;
+			final Values tuple = new Values();
+
+			SpoutCollector<?> collector;
+
+			final String streamId = "streamId";
+			HashMap<String, Integer> attributes = new HashMap<String, Integer>();
+			attributes.put(streamId, numberOfAttributes);
+
+			if (numberOfAttributes == -1) {
+				collector = new SpoutCollector(attributes, flinkCollector);
+				tuple.add(new Integer(this.r.nextInt()));
+
+			} else {
+				collector = new SpoutCollector(attributes, flinkCollector);
+				flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+
+				for (int i = 0; i < numberOfAttributes; ++i) {
+					tuple.add(new Integer(this.r.nextInt()));
+					flinkTuple.setField(tuple.get(i), i);
+				}
+			}
+
+			final List<Integer> taskIds;
+			final Object messageId = new Integer(this.r.nextInt());
+
+			taskIds = collector.emit(streamId, tuple, messageId);
+
+			Assert.assertNull(taskIds);
+
+			if (numberOfAttributes == -1) {
+				verify(flinkCollector).collect(tuple.get(0));
+			} else {
+				verify(flinkCollector).collect(flinkTuple);
+			}
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test(expected = UnsupportedOperationException.class)
+	public void testEmitDirect() {
+		new SpoutCollector<Object>(mock(HashMap.class), mock(SourceContext.class)).emitDirect(
+				0, null, null,
+				(Object) null);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
new file mode 100644
index 0000000..227d736
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.tuple.Fields;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.util.FiniteSpout;
+import org.apache.flink.storm.util.FiniteTestSpout;
+import org.apache.flink.storm.util.StormConfig;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.storm.wrappers.WrapperSetupHelper;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(WrapperSetupHelper.class)
+public class SpoutWrapperTest extends AbstractTest {
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testRunPrepare() throws Exception {
+		final StormConfig stormConfig = new StormConfig();
+		stormConfig.put(this.r.nextInt(), this.r.nextInt());
+		final Configuration flinkConfig = new Configuration();
+		flinkConfig.setInteger("testKey", this.r.nextInt());
+
+		final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
+		when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
+				.thenReturn(flinkConfig);
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
+
+		final IRichSpout spout = mock(IRichSpout.class);
+		SpoutWrapper spoutWrapper = new SpoutWrapper(spout);
+		spoutWrapper.setRuntimeContext(taskContext);
+		spoutWrapper.cancel();
+
+		// test without configuration
+		spoutWrapper.run(mock(SourceContext.class));
+		verify(spout).open(any(Map.class), any(TopologyContext.class),
+				any(SpoutOutputCollector.class));
+
+		// test with StormConfig
+		spoutWrapper.run(mock(SourceContext.class));
+		verify(spout).open(eq(stormConfig), any(TopologyContext.class),
+				any(SpoutOutputCollector.class));
+
+		// test with Configuration
+		final TestDummySpout testSpout = new TestDummySpout();
+		spoutWrapper = new SpoutWrapper(testSpout);
+		spoutWrapper.setRuntimeContext(taskContext);
+		spoutWrapper.cancel();
+
+		spoutWrapper.run(mock(SourceContext.class));
+		for (Entry<String, String> entry : flinkConfig.toMap().entrySet()) {
+			Assert.assertEquals(entry.getValue(), testSpout.config.get(entry.getKey()));
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testRunExecuteFixedNumber() throws Exception {
+		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+		declarer.declare(new Fields("dummy"));
+		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments()
+				.thenReturn(declarer);
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
+
+		final IRichSpout spout = mock(IRichSpout.class);
+		final int numberOfCalls = this.r.nextInt(50);
+		final SpoutWrapper<?> spoutWrapper = new SpoutWrapper<Object>(spout,
+				numberOfCalls);
+		spoutWrapper.setRuntimeContext(taskContext);
+
+		spoutWrapper.run(mock(SourceContext.class));
+		verify(spout, times(numberOfCalls)).nextTuple();
+	}
+
+	@Test
+	public void testRunExecuteFinite() throws Exception {
+		final int numberOfCalls = this.r.nextInt(50);
+
+		final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
+		for (int i = numberOfCalls - 1; i >= 0; --i) {
+			expectedResult.add(new Tuple1<Integer>(new Integer(i)));
+		}
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
+
+		final FiniteTestSpout spout = new FiniteTestSpout(numberOfCalls);
+		final SpoutWrapper<Tuple1<Integer>> spoutWrapper = new SpoutWrapper<Tuple1<Integer>>(
+				spout, -1);
+		spoutWrapper.setRuntimeContext(taskContext);
+
+		final TestContext collector = new TestContext();
+		spoutWrapper.run(collector);
+
+		Assert.assertEquals(expectedResult, collector.result);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void runAndExecuteFiniteSpout() throws Exception {
+		final FiniteSpout stormSpout = mock(FiniteSpout.class);
+		when(stormSpout.reachedEnd()).thenReturn(false, false, false, true, false, false, true);
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
+
+		final SpoutWrapper<?> wrapper = new SpoutWrapper<Object>(stormSpout);
+		wrapper.setRuntimeContext(taskContext);
+
+		wrapper.run(mock(SourceContext.class));
+		verify(stormSpout, times(3)).nextTuple();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void runAndExecuteFiniteSpout2() throws Exception {
+		final FiniteSpout stormSpout = mock(FiniteSpout.class);
+		when(stormSpout.reachedEnd()).thenReturn(true, false, true, false, true, false, true);
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
+
+		final SpoutWrapper<?> wrapper = new SpoutWrapper<Object>(stormSpout);
+		wrapper.setRuntimeContext(taskContext);
+
+		wrapper.run(mock(SourceContext.class));
+		verify(stormSpout, never()).nextTuple();
+	}
+
+	@Test
+	public void testCancel() throws Exception {
+		final int numberOfCalls = 5 + this.r.nextInt(5);
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+		when(taskContext.getTaskName()).thenReturn("name");
+
+		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
+
+		final SpoutWrapper<Tuple1<Integer>> spoutWrapper = new SpoutWrapper<Tuple1<Integer>>(spout);
+		spoutWrapper.setRuntimeContext(taskContext);
+
+		spoutWrapper.cancel();
+		final TestContext collector = new TestContext();
+		spoutWrapper.run(collector);
+
+		Assert.assertEquals(new LinkedList<Tuple1<Integer>>(), collector.result);
+	}
+
+	@Test
+	public void testClose() throws Exception {
+		final IRichSpout spout = mock(IRichSpout.class);
+		final SpoutWrapper<Tuple1<Integer>> spoutWrapper = new SpoutWrapper<Tuple1<Integer>>(spout);
+
+		spoutWrapper.close();
+
+		verify(spout).close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
new file mode 100644
index 0000000..155fcd9
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.wrappers.StormTuple;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+
+public class StormTupleTest extends AbstractTest {
+	private static final String fieldName = "fieldName";
+	private static final String fieldNamePojo = "member";
+
+	private int arity, index;
+
+	@Override
+	@Before
+	public void prepare() {
+		super.prepare();
+		this.arity = 1 + r.nextInt(25);
+		this.index = r.nextInt(this.arity);
+	}
+
+	@Test
+	public void nonTupleTest() {
+		final Object flinkTuple = this.r.nextInt();
+
+		final StormTuple<Object> tuple = new StormTuple<Object>(flinkTuple, null);
+		Assert.assertSame(flinkTuple, tuple.getValue(0));
+
+		final List<Object> values = tuple.getValues();
+		Assert.assertEquals(1, values.size());
+		Assert.assertEquals(flinkTuple, values.get(0));
+	}
+
+	@Test
+	public void tupleTest() throws InstantiationException, IllegalAccessException {
+		final int numberOfAttributes = this.r.nextInt(26);
+		final Object[] data = new Object[numberOfAttributes];
+
+		final Tuple flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			data[i] = this.r.nextInt();
+			flinkTuple.setField(data[i], i);
+		}
+
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+		final List<Object> values = tuple.getValues();
+
+		Assert.assertEquals(numberOfAttributes, values.size());
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			Assert.assertEquals(flinkTuple.getField(i), values.get(i));
+		}
+
+		Assert.assertEquals(numberOfAttributes, tuple.size());
+	}
+
+	@Test
+	public void testBinary() {
+		final byte[] data = new byte[this.r.nextInt(15)];
+		this.r.nextBytes(data);
+
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
+	}
+
+	@Test
+	public void testBoolean() {
+		final Boolean flinkTuple = this.r.nextBoolean();
+
+		final StormTuple<Boolean> tuple = new StormTuple<Boolean>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple, tuple.getBoolean(0));
+	}
+
+	@Test
+	public void testByte() {
+		final Byte flinkTuple = (byte) this.r.nextInt();
+
+		final StormTuple<Byte> tuple = new StormTuple<Byte>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple, tuple.getByte(0));
+	}
+
+	@Test
+	public void testDouble() {
+		final Double flinkTuple = this.r.nextDouble();
+
+		final StormTuple<Double> tuple = new StormTuple<Double>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple, tuple.getDouble(0));
+	}
+
+	@Test
+	public void testFloat() {
+		final Float flinkTuple = this.r.nextFloat();
+
+		final StormTuple<Float> tuple = new StormTuple<Float>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple, tuple.getFloat(0));
+	}
+
+	@Test
+	public void testInteger() {
+		final Integer flinkTuple = this.r.nextInt();
+
+		final StormTuple<Integer> tuple = new StormTuple<Integer>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple, tuple.getInteger(0));
+	}
+
+	@Test
+	public void testLong() {
+		final Long flinkTuple = this.r.nextLong();
+
+		final StormTuple<Long> tuple = new StormTuple<Long>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple, tuple.getLong(0));
+	}
+
+	@Test
+	public void testShort() {
+		final Short flinkTuple = (short) this.r.nextInt();
+
+		final StormTuple<Short> tuple = new StormTuple<Short>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple, tuple.getShort(0));
+	}
+
+	@Test
+	public void testString() {
+		final byte[] data = new byte[this.r.nextInt(15)];
+		this.r.nextBytes(data);
+		final String flinkTuple = new String(data);
+
+		final StormTuple<String> tuple = new StormTuple<String>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple, tuple.getString(0));
+	}
+
+	@Test
+	public void testBinaryTuple() {
+		final byte[] data = new byte[this.r.nextInt(15)];
+		this.r.nextBytes(data);
+
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
+	}
+
+	@Test
+	public void testBooleanTuple() {
+		final Boolean data = this.r.nextBoolean();
+
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getBoolean(index));
+	}
+
+	@Test
+	public void testByteTuple() {
+		final Byte data = (byte) this.r.nextInt();
+
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getByte(index));
+	}
+
+	@Test
+	public void testDoubleTuple() {
+		final Double data = this.r.nextDouble();
+
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getDouble(index));
+	}
+
+	@Test
+	public void testFloatTuple() {
+		final Float data = this.r.nextFloat();
+
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getFloat(index));
+	}
+
+	@Test
+	public void testIntegerTuple() {
+		final Integer data = this.r.nextInt();
+
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getInteger(index));
+	}
+
+	@Test
+	public void testLongTuple() {
+		final Long data = this.r.nextLong();
+
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getLong(index));
+	}
+
+	@Test
+	public void testShortTuple() {
+		final Short data = (short) this.r.nextInt();
+
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getShort(index));
+	}
+
+	@Test
+	public void testStringTuple() {
+		final byte[] rawdata = new byte[this.r.nextInt(15)];
+		this.r.nextBytes(rawdata);
+		final String data = new String(rawdata);
+
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getString(index));
+	}
+
+	@Test
+	public void testContains() throws Exception {
+		Fields schema = new Fields("a1", "a2");
+		StormTuple<Object> tuple = new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
+				schema);
+
+		Assert.assertTrue(tuple.contains("a1"));
+		Assert.assertTrue(tuple.contains("a2"));
+		Assert.assertFalse(tuple.contains("a3"));
+	}
+
+	@Test
+	public void testGetFields() throws Exception {
+		Fields schema = new Fields();
+
+		Assert.assertSame(schema, new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
+				schema).getFields());
+
+		Assert.assertSame(null, new StormTuple<Object>(null, schema).getFields());
+
+	}
+
+	@Test
+	public void testFieldIndex() throws Exception {
+		Fields schema = new Fields("a1", "a2");
+		StormTuple<Object> tuple = new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
+				schema);
+
+		Assert.assertEquals(0, tuple.fieldIndex("a1"));
+		Assert.assertEquals(1, tuple.fieldIndex("a2"));
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testSelect() throws Exception {
+		Tuple tuple = Tuple.getTupleClass(arity).newInstance();
+		Values values = new Values();
+
+		ArrayList<String> attributeNames = new ArrayList<String>(arity);
+		ArrayList<String> filterNames = new ArrayList<String>(arity);
+
+		for (int i = 0; i < arity; ++i) {
+			tuple.setField(i, i);
+			attributeNames.add("a" + i);
+
+			if (r.nextBoolean()) {
+				filterNames.add("a" + i);
+				values.add(i);
+			}
+		}
+		Fields schema = new Fields(attributeNames);
+		Fields selector = new Fields(filterNames);
+
+		Assert.assertEquals(values, new StormTuple(tuple, schema).select(selector));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetValueByField() throws Exception {
+		Object value = mock(Object.class);
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getValueByField(fieldName));
+
+	}
+
+	@Test
+	public void testGetValueByFieldPojo() throws Exception {
+		Object value = mock(Object.class);
+		TestPojoMember<Object> pojo = new TestPojoMember<Object>(value);
+		StormTuple<TestPojoMember<Object>> tuple = new StormTuple<TestPojoMember<Object>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getValueByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetValueByFieldPojoGetter() throws Exception {
+		Object value = mock(Object.class);
+		TestPojoGetter<Object> pojo = new TestPojoGetter<Object>(value);
+		StormTuple<TestPojoGetter<Object>> tuple = new StormTuple<TestPojoGetter<Object>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getValueByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetStringByField() throws Exception {
+		String value = "stringValue";
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getStringByField(fieldName));
+	}
+
+	@Test
+	public void testGetStringByFieldPojo() throws Exception {
+		String value = "stringValue";
+		TestPojoMember<String> pojo = new TestPojoMember<String>(value);
+		StormTuple<TestPojoMember<String>> tuple = new StormTuple<TestPojoMember<String>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getStringByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetStringByFieldPojoGetter() throws Exception {
+		String value = "stringValue";
+		TestPojoGetter<String> pojo = new TestPojoGetter<String>(value);
+		StormTuple<TestPojoGetter<String>> tuple = new StormTuple<TestPojoGetter<String>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getStringByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetIntegerByField() throws Exception {
+		Integer value = r.nextInt();
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getIntegerByField(fieldName));
+	}
+
+	@Test
+	public void testGetIntegerByFieldPojo() throws Exception {
+		Integer value = r.nextInt();
+		TestPojoMember<Integer> pojo = new TestPojoMember<Integer>(value);
+		StormTuple<TestPojoMember<Integer>> tuple = new StormTuple<TestPojoMember<Integer>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getIntegerByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetIntegerByFieldPojoGetter() throws Exception {
+		Integer value = r.nextInt();
+		TestPojoGetter<Integer> pojo = new TestPojoGetter<Integer>(value);
+		StormTuple<TestPojoGetter<Integer>> tuple = new StormTuple<TestPojoGetter<Integer>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getIntegerByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetLongByField() throws Exception {
+		Long value = r.nextLong();
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getLongByField(fieldName));
+	}
+
+	@Test
+	public void testGetLongByFieldPojo() throws Exception {
+		Long value = r.nextLong();
+		TestPojoMember<Long> pojo = new TestPojoMember<Long>(value);
+		StormTuple<TestPojoMember<Long>> tuple = new StormTuple<TestPojoMember<Long>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getLongByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetLongByFieldPojoGetter() throws Exception {
+		Long value = r.nextLong();
+		TestPojoGetter<Long> pojo = new TestPojoGetter<Long>(value);
+		StormTuple<TestPojoGetter<Long>> tuple = new StormTuple<TestPojoGetter<Long>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getLongByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetBooleanByField() throws Exception {
+		Boolean value = r.nextBoolean();
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertEquals(value, tuple.getBooleanByField(fieldName));
+	}
+
+	@Test
+	public void testGetBooleanByFieldPojo() throws Exception {
+		Boolean value = r.nextBoolean();
+		TestPojoMember<Boolean> pojo = new TestPojoMember<Boolean>(value);
+		StormTuple<TestPojoMember<Boolean>> tuple = new StormTuple<TestPojoMember<Boolean>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getBooleanByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetBooleanByFieldPojoGetter() throws Exception {
+		Boolean value = r.nextBoolean();
+		TestPojoGetter<Boolean> pojo = new TestPojoGetter<Boolean>(value);
+		StormTuple<TestPojoGetter<Boolean>> tuple = new StormTuple<TestPojoGetter<Boolean>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getBooleanByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetShortByField() throws Exception {
+		Short value = (short) r.nextInt();
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getShortByField(fieldName));
+	}
+
+	@Test
+	public void testGetShortByFieldPojo() throws Exception {
+		Short value = (short) r.nextInt();
+		TestPojoMember<Short> pojo = new TestPojoMember<Short>(value);
+		StormTuple<TestPojoMember<Short>> tuple = new StormTuple<TestPojoMember<Short>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getShortByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetShortByFieldPojoGetter() throws Exception {
+		Short value = (short) r.nextInt();
+		TestPojoGetter<Short> pojo = new TestPojoGetter<Short>(value);
+		StormTuple<TestPojoGetter<Short>> tuple = new StormTuple<TestPojoGetter<Short>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getShortByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetByteByField() throws Exception {
+		Byte value = new Byte((byte) r.nextInt());
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getByteByField(fieldName));
+	}
+
+	@Test
+	public void testGetByteByFieldPojo() throws Exception {
+		Byte value = new Byte((byte) r.nextInt());
+		TestPojoMember<Byte> pojo = new TestPojoMember<Byte>(value);
+		StormTuple<TestPojoMember<Byte>> tuple = new StormTuple<TestPojoMember<Byte>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getByteByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetByteByFieldPojoGetter() throws Exception {
+		Byte value = new Byte((byte) r.nextInt());
+		TestPojoGetter<Byte> pojo = new TestPojoGetter<Byte>(value);
+		StormTuple<TestPojoGetter<Byte>> tuple = new StormTuple<TestPojoGetter<Byte>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getByteByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetDoubleByField() throws Exception {
+		Double value = r.nextDouble();
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getDoubleByField(fieldName));
+	}
+
+	@Test
+	public void testGetDoubleByFieldPojo() throws Exception {
+		Double value = r.nextDouble();
+		TestPojoMember<Double> pojo = new TestPojoMember<Double>(value);
+		StormTuple<TestPojoMember<Double>> tuple = new StormTuple<TestPojoMember<Double>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetDoubleByFieldPojoGetter() throws Exception {
+		Double value = r.nextDouble();
+		TestPojoGetter<Double> pojo = new TestPojoGetter<Double>(value);
+		StormTuple<TestPojoGetter<Double>> tuple = new StormTuple<TestPojoGetter<Double>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetFloatByField() throws Exception {
+		Float value = r.nextFloat();
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getFloatByField(fieldName));
+	}
+
+	@Test
+	public void testGetFloatByFieldPojo() throws Exception {
+		Float value = r.nextFloat();
+		TestPojoMember<Float> pojo = new TestPojoMember<Float>(value);
+		StormTuple<TestPojoMember<Float>> tuple = new StormTuple<TestPojoMember<Float>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetFloatByFieldPojoGetter() throws Exception {
+		Float value = r.nextFloat();
+		TestPojoGetter<Float> pojo = new TestPojoGetter<Float>(value);
+		StormTuple<TestPojoGetter<Float>> tuple = new StormTuple<TestPojoGetter<Float>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetBinaryByField() throws Exception {
+		byte[] data = new byte[1 + r.nextInt(20)];
+		r.nextBytes(data);
+		StormTuple tuple = testGetByField(arity, index, data);
+		Assert.assertSame(data, tuple.getBinaryByField(fieldName));
+	}
+
+	@Test
+	public void testGetBinaryFieldPojo() throws Exception {
+		byte[] data = new byte[1 + r.nextInt(20)];
+		r.nextBytes(data);
+		TestPojoMember<byte[]> pojo = new TestPojoMember<byte[]>(data);
+		StormTuple<TestPojoMember<byte[]>> tuple = new StormTuple<TestPojoMember<byte[]>>(pojo,
+				null);
+		Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetBinaryByFieldPojoGetter() throws Exception {
+		byte[] data = new byte[1 + r.nextInt(20)];
+		r.nextBytes(data);
+		TestPojoGetter<byte[]> pojo = new TestPojoGetter<byte[]>(data);
+		StormTuple<TestPojoGetter<byte[]>> tuple = new StormTuple<TestPojoGetter<byte[]>>(pojo,
+				null);
+		Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	private <T> StormTuple testGetByField(int arity, int index, T value)
+			throws Exception {
+
+		assert (index < arity);
+
+		Tuple tuple = Tuple.getTupleClass(arity).newInstance();
+		tuple.setField(value, index);
+
+		ArrayList<String> attributeNames = new ArrayList<String>(arity);
+		for(int i = 0; i < arity; ++i) {
+			if(i == index) {
+				attributeNames.add(fieldName);
+			} else {
+				attributeNames.add("" + i);
+			}
+		}
+		Fields schema = new Fields(attributeNames);
+
+		return new StormTuple(tuple, schema);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetSourceGlobalStreamid() {
+		new StormTuple<Object>(null, null).getSourceGlobalStreamid();
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetSourceComponent() {
+		new StormTuple<Object>(null, null).getSourceComponent();
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetSourceTask() {
+		new StormTuple<Object>(null, null).getSourceTask();
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetSourceStreamId() {
+		new StormTuple<Object>(null, null).getSourceStreamId();
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetMessageId() {
+		new StormTuple<Object>(null, null).getMessageId();
+	}
+
+	public static class TestPojoMember<T> {
+		public T member;
+
+		public TestPojoMember(T value) {
+			this.member = value;
+		}
+	}
+
+	public static class TestPojoGetter<T> {
+		private T member;
+
+		public TestPojoGetter(T value) {
+			this.member = value;
+		}
+
+		public T getMember() {
+			return this.member;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
new file mode 100644
index 0000000..4c4749a
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.util.LinkedList;
+
+class TestContext implements SourceContext<Tuple1<Integer>> {
+	public LinkedList<Tuple1<Integer>> result = new LinkedList<Tuple1<Integer>>();
+
+	public TestContext() {
+	}
+
+	@Override
+	public void collect(final Tuple1<Integer> record) {
+		this.result.add(record.copy());
+	}
+
+	@Override
+	public void collectWithTimestamp(Tuple1<Integer> element, long timestamp) {
+		this.result.add(element.copy());
+	}
+
+	@Override
+	public void emitWatermark(Watermark mark) {
+		// ignore it
+	}
+
+	@Override
+	public Object getCheckpointLock() {
+		return null;
+	}
+
+	@Override
+	public void close() {
+
+	}
+}