You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:35 UTC

[10/15] flink git commit: [Storm Compatibility] Maven module restucturing and cleanup - removed storm-parent; renamed storm-core and storm-examples - updated internal Java package structure * renamed package "stormcompatibility" to "storm" *

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
deleted file mode 100644
index 6817593..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.HashSet;
-import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.same;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({StreamRecordSerializer.class, StormWrapperSetupHelper.class})
-public class StormBoltWrapperTest extends AbstractTest {
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testWrapperRawType() throws Exception {
-		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
-		declarer.declare(new Fields("dummy1", "dummy2"));
-		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
-		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class),
-				new String[] { Utils.DEFAULT_STREAM_ID });
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testWrapperToManyAttributes1() throws Exception {
-		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
-		final String[] schema = new String[26];
-		for (int i = 0; i < schema.length; ++i) {
-			schema[i] = "a" + i;
-		}
-		declarer.declare(new Fields(schema));
-		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
-		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class));
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testWrapperToManyAttributes2() throws Exception {
-		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
-		final String[] schema = new String[26];
-		for (int i = 0; i < schema.length; ++i) {
-			schema[i] = "a" + i;
-		}
-		declarer.declare(new Fields(schema));
-		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
-		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), new String[] {});
-	}
-
-	@Test
-	public void testWrapper() throws Exception {
-		for (int i = -1; i < 26; ++i) {
-			this.testWrapper(i);
-		}
-	}
-
-	@SuppressWarnings({"rawtypes", "unchecked"})
-	private void testWrapper(final int numberOfAttributes) throws Exception {
-		assert ((-1 <= numberOfAttributes) && (numberOfAttributes <= 25));
-		Tuple flinkTuple = null;
-		String rawTuple = null;
-
-		if (numberOfAttributes == -1) {
-			rawTuple = "test";
-		} else {
-			flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
-		}
-
-		final String[] schema;
-		if (numberOfAttributes == -1) {
-			schema = new String[1];
-		} else {
-			schema = new String[numberOfAttributes];
-		}
-		for (int i = 0; i < schema.length; ++i) {
-			schema[i] = "a" + i;
-		}
-
-		final StreamRecord record = mock(StreamRecord.class);
-		if (numberOfAttributes == -1) {
-			when(record.getValue()).thenReturn(rawTuple);
-		} else {
-			when(record.getValue()).thenReturn(flinkTuple);
-		}
-
-		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
-		when(taskContext.getTaskName()).thenReturn("name");
-
-		final IRichBolt bolt = mock(IRichBolt.class);
-
-		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
-		declarer.declare(new Fields(schema));
-		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
-		final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, (Fields) null);
-		wrapper.setup(mock(Output.class), taskContext);
-		wrapper.open(null);
-
-		wrapper.processElement(record);
-		if (numberOfAttributes == -1) {
-			verify(bolt).execute(eq(new StormTuple<String>(rawTuple, null)));
-		} else {
-			verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple, null)));
-		}
-	}
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void testMultipleOutputStreams() throws Exception {
-		final boolean rawOutType1 = super.r.nextBoolean();
-		final boolean rawOutType2 = super.r.nextBoolean();
-
-		final StreamRecord record = mock(StreamRecord.class);
-		when(record.getValue()).thenReturn(2).thenReturn(3);
-
-		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
-		when(taskContext.getTaskName()).thenReturn("name");
-
-		final Output output = mock(Output.class);
-
-		final TestBolt bolt = new TestBolt();
-		final HashSet<String> raw = new HashSet<String>();
-		if (rawOutType1) {
-			raw.add("stream1");
-		}
-		if (rawOutType2) {
-			raw.add("stream2");
-		}
-
-		final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, (Fields) null, raw);
-		wrapper.setup(output, taskContext);
-		wrapper.open(null);
-
-		final SplitStreamType splitRecord = new SplitStreamType<Integer>();
-		if (rawOutType1) {
-			splitRecord.streamId = "stream1";
-			splitRecord.value = 2;
-		} else {
-			splitRecord.streamId = "stream1";
-			splitRecord.value = new Tuple1<Integer>(2);
-		}
-		wrapper.processElement(record);
-		verify(output).collect(new StreamRecord<SplitStreamType>(splitRecord, 0));
-
-		if (rawOutType2) {
-			splitRecord.streamId = "stream2";
-			splitRecord.value = 3;
-		} else {
-			splitRecord.streamId = "stream2";
-			splitRecord.value = new Tuple1<Integer>(3);
-		}
-		wrapper.processElement(record);
-		verify(output, times(2)).collect(new StreamRecord<SplitStreamType>(splitRecord, 0));
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testOpen() throws Exception {
-		final StormConfig stormConfig = new StormConfig();
-		final Configuration flinkConfig = new Configuration();
-
-		final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
-		when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
-				.thenReturn(flinkConfig);
-
-		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
-		when(taskContext.getTaskName()).thenReturn("name");
-
-		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
-		declarer.declare(new Fields("dummy"));
-		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
-
-		final IRichBolt bolt = mock(IRichBolt.class);
-		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
-		wrapper.setup(mock(Output.class), taskContext);
-
-		// test without configuration
-		wrapper.open(null);
-		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
-
-		// test with StormConfig
-		wrapper.open(null);
-		verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
-				any(OutputCollector.class));
-
-		// test with Configuration
-		wrapper.open(null);
-		verify(bolt, times(3)).prepare(eq(flinkConfig.toMap()), any(TopologyContext.class),
-				any(OutputCollector.class));
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testOpenSink() throws Exception {
-		final StormConfig stormConfig = new StormConfig();
-		final Configuration flinkConfig = new Configuration();
-
-		final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
-		when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
-		.thenReturn(flinkConfig);
-
-		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
-		when(taskContext.getTaskName()).thenReturn("name");
-
-		final IRichBolt bolt = mock(IRichBolt.class);
-
-		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
-		wrapper.setup(mock(Output.class), taskContext);
-
-		// test without configuration
-		wrapper.open(null);
-		verify(bolt).prepare(any(Map.class), any(TopologyContext.class),
-				isNull(OutputCollector.class));
-
-		// test with StormConfig
-		wrapper.open(null);
-		verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
-				isNull(OutputCollector.class));
-
-		// test with Configuration
-		wrapper.open(null);
-		verify(bolt, times(3)).prepare(eq(flinkConfig.toMap()), any(TopologyContext.class),
-				isNull(OutputCollector.class));
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testClose() throws Exception {
-		final IRichBolt bolt = mock(IRichBolt.class);
-
-		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
-		declarer.declare(new Fields("dummy"));
-		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
-		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
-
-		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		wrapper.setup(mock(Output.class), taskContext);
-
-		wrapper.close();
-		wrapper.dispose();
-
-		verify(bolt).cleanup();
-	}
-
-	private static final class TestBolt implements IRichBolt {
-		private static final long serialVersionUID = 7278692872260138758L;
-		private OutputCollector collector;
-
-		@SuppressWarnings("rawtypes")
-		@Override
-		public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-			this.collector = collector;
-		}
-
-		int counter = 0;
-		@Override
-		public void execute(backtype.storm.tuple.Tuple input) {
-			if (++counter % 2 == 1) {
-				this.collector.emit("stream1", new Values(input.getInteger(0)));
-			} else {
-				this.collector.emit("stream2", new Values(input.getInteger(0)));
-			}
-		}
-
-		@Override
-		public void cleanup() {}
-
-		@Override
-		public void declareOutputFields(OutputFieldsDeclarer declarer) {
-			declarer.declareStream("stream1", new Fields("a1"));
-			declarer.declareStream("stream2", new Fields("a2"));
-		}
-
-		@Override
-		public Map<String, Object> getComponentConfiguration() {
-			return null;
-		}
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
deleted file mode 100644
index 77f1b05..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.tuple.Fields;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.apache.flink.stormcompatibility.util.FiniteTestSpout;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.LinkedList;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(StormWrapperSetupHelper.class)
-public class StormFiniteSpoutWrapperTest extends AbstractTest {
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testRunExecuteFixedNumber() throws Exception {
-		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
-		declarer.declare(new Fields("dummy"));
-		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
-		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
-		when(taskContext.getTaskName()).thenReturn("name");
-
-		final IRichSpout spout = mock(IRichSpout.class);
-		final int numberOfCalls = this.r.nextInt(50);
-		final StormFiniteSpoutWrapper<?> spoutWrapper = new StormFiniteSpoutWrapper<Object>(spout, numberOfCalls);
-		spoutWrapper.setRuntimeContext(taskContext);
-
-		spoutWrapper.run(mock(SourceContext.class));
-		verify(spout, times(numberOfCalls)).nextTuple();
-	}
-
-	@Test
-	public void testRunExecute() throws Exception {
-		final int numberOfCalls = this.r.nextInt(50);
-
-		final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
-		for (int i = numberOfCalls - 1; i >= 0; --i) {
-			expectedResult.add(new Tuple1<Integer>(new Integer(i)));
-		}
-
-		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
-		when(taskContext.getTaskName()).thenReturn("name");
-
-		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
-		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
-				spout);
-		spoutWrapper.setRuntimeContext(taskContext);
-
-		final TestContext collector = new TestContext();
-		spoutWrapper.run(collector);
-
-		Assert.assertEquals(expectedResult, collector.result);
-	}
-
-	@Test
-	public void testCancel() throws Exception {
-		final int numberOfCalls = 5 + this.r.nextInt(5);
-
-		final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
-		expectedResult.add(new Tuple1<Integer>(new Integer(numberOfCalls - 1)));
-
-		StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
-		when(taskContext.getTaskName()).thenReturn("name");
-
-		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
-		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
-				spout);
-
-		spoutWrapper.cancel();
-		final TestContext collector = new TestContext();
-		spoutWrapper.run(collector);
-
-		Assert.assertEquals(expectedResult, collector.result);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
deleted file mode 100644
index 36ed58a..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.tuple.Values;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-public class StormSpoutCollectorTest extends AbstractTest {
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void testSpoutStormCollector() throws InstantiationException, IllegalAccessException {
-		for (int numberOfAttributes = -1; numberOfAttributes < 26; ++numberOfAttributes) {
-			final SourceContext flinkCollector = mock(SourceContext.class);
-			Tuple flinkTuple = null;
-			final Values tuple = new Values();
-
-			StormSpoutCollector<?> collector;
-
-			final String streamId = "streamId";
-			HashMap<String, Integer> attributes = new HashMap<String, Integer>();
-			attributes.put(streamId, numberOfAttributes);
-
-			if (numberOfAttributes == -1) {
-				collector = new StormSpoutCollector(attributes, flinkCollector);
-				tuple.add(new Integer(this.r.nextInt()));
-
-			} else {
-				collector = new StormSpoutCollector(attributes, flinkCollector);
-				flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
-
-				for (int i = 0; i < numberOfAttributes; ++i) {
-					tuple.add(new Integer(this.r.nextInt()));
-					flinkTuple.setField(tuple.get(i), i);
-				}
-			}
-
-			final List<Integer> taskIds;
-			final Object messageId = new Integer(this.r.nextInt());
-
-			taskIds = collector.emit(streamId, tuple, messageId);
-
-			Assert.assertNull(taskIds);
-
-			if (numberOfAttributes == -1) {
-				verify(flinkCollector).collect(tuple.get(0));
-			} else {
-				verify(flinkCollector).collect(flinkTuple);
-			}
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test(expected = UnsupportedOperationException.class)
-	public void testReportError() {
-		new StormSpoutCollector<Object>(mock(HashMap.class), mock(SourceContext.class))
-				.reportError(null);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test(expected = UnsupportedOperationException.class)
-	public void testEmitDirect() {
-		new StormSpoutCollector<Object>(mock(HashMap.class), mock(SourceContext.class)).emitDirect(
-				0, null, null,
-				(Object) null);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
deleted file mode 100644
index f4fb4da..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.apache.flink.stormcompatibility.util.FiniteTestSpout;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.LinkedList;
-import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(StormWrapperSetupHelper.class)
-public class StormSpoutWrapperTest extends AbstractTest {
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void testRunPrepare() throws Exception {
-		final StormConfig stormConfig = new StormConfig();
-		final Configuration flinkConfig = new Configuration();
-
-		final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
-		when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
-				.thenReturn(flinkConfig);
-
-		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
-
-		final IRichSpout spout = mock(IRichSpout.class);
-		final StormSpoutWrapper spoutWrapper = new StormSpoutWrapper(spout);
-		spoutWrapper.setRuntimeContext(taskContext);
-		spoutWrapper.isRunning = false;
-
-		// test without configuration
-		spoutWrapper.run(mock(SourceContext.class));
-		verify(spout).open(any(Map.class), any(TopologyContext.class),
-				any(SpoutOutputCollector.class));
-
-		// test with StormConfig
-		spoutWrapper.run(mock(SourceContext.class));
-		verify(spout).open(same(stormConfig), any(TopologyContext.class),
-				any(SpoutOutputCollector.class));
-
-		// test with Configuration
-		spoutWrapper.run(mock(SourceContext.class));
-		verify(spout, times(3)).open(eq(flinkConfig.toMap()), any(TopologyContext.class),
-				any(SpoutOutputCollector.class));
-	}
-
-	@Test
-	public void testRunExecuteCancelInfinite() throws Exception {
-		final int numberOfCalls = 5 + this.r.nextInt(5);
-
-		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
-		when(taskContext.getTaskName()).thenReturn("name");
-
-		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
-
-
-		final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
-		spoutWrapper.setRuntimeContext(taskContext);
-
-		spoutWrapper.cancel();
-		final TestContext collector = new TestContext();
-		spoutWrapper.run(collector);
-
-		Assert.assertEquals(new LinkedList<Tuple1<Integer>>(), collector.result);
-	}
-
-	@Test
-	public void testClose() throws Exception {
-		final IRichSpout spout = mock(IRichSpout.class);
-		final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
-
-		spoutWrapper.close();
-
-		verify(spout).close();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
deleted file mode 100644
index 06d5399..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
+++ /dev/null
@@ -1,659 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-
-public class StormTupleTest extends AbstractTest {
-	private static final String fieldName = "fieldName";
-	private static final String fieldNamePojo = "member";
-
-	private int arity, index;
-
-	@Override
-	@Before
-	public void prepare() {
-		super.prepare();
-		this.arity = 1 + r.nextInt(25);
-		this.index = r.nextInt(this.arity);
-	}
-
-	@Test
-	public void nonTupleTest() {
-		final Object flinkTuple = this.r.nextInt();
-
-		final StormTuple<Object> tuple = new StormTuple<Object>(flinkTuple, null);
-		Assert.assertSame(flinkTuple, tuple.getValue(0));
-
-		final List<Object> values = tuple.getValues();
-		Assert.assertEquals(1, values.size());
-		Assert.assertEquals(flinkTuple, values.get(0));
-	}
-
-	@Test
-	public void tupleTest() throws InstantiationException, IllegalAccessException {
-		final int numberOfAttributes = this.r.nextInt(26);
-		final Object[] data = new Object[numberOfAttributes];
-
-		final Tuple flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
-		for (int i = 0; i < numberOfAttributes; ++i) {
-			data[i] = this.r.nextInt();
-			flinkTuple.setField(data[i], i);
-		}
-
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
-		final List<Object> values = tuple.getValues();
-
-		Assert.assertEquals(numberOfAttributes, values.size());
-		for (int i = 0; i < numberOfAttributes; ++i) {
-			Assert.assertEquals(flinkTuple.getField(i), values.get(i));
-		}
-
-		Assert.assertEquals(numberOfAttributes, tuple.size());
-	}
-
-	@Test
-	public void testBinary() {
-		final byte[] data = new byte[this.r.nextInt(15)];
-		this.r.nextBytes(data);
-
-		final int index = this.r.nextInt(5);
-		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
-		flinkTuple.setField(data, index);
-
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
-	}
-
-	@Test
-	public void testBoolean() {
-		final Boolean flinkTuple = this.r.nextBoolean();
-
-		final StormTuple<Boolean> tuple = new StormTuple<Boolean>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple, tuple.getBoolean(0));
-	}
-
-	@Test
-	public void testByte() {
-		final Byte flinkTuple = (byte) this.r.nextInt();
-
-		final StormTuple<Byte> tuple = new StormTuple<Byte>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple, tuple.getByte(0));
-	}
-
-	@Test
-	public void testDouble() {
-		final Double flinkTuple = this.r.nextDouble();
-
-		final StormTuple<Double> tuple = new StormTuple<Double>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple, tuple.getDouble(0));
-	}
-
-	@Test
-	public void testFloat() {
-		final Float flinkTuple = this.r.nextFloat();
-
-		final StormTuple<Float> tuple = new StormTuple<Float>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple, tuple.getFloat(0));
-	}
-
-	@Test
-	public void testInteger() {
-		final Integer flinkTuple = this.r.nextInt();
-
-		final StormTuple<Integer> tuple = new StormTuple<Integer>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple, tuple.getInteger(0));
-	}
-
-	@Test
-	public void testLong() {
-		final Long flinkTuple = this.r.nextLong();
-
-		final StormTuple<Long> tuple = new StormTuple<Long>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple, tuple.getLong(0));
-	}
-
-	@Test
-	public void testShort() {
-		final Short flinkTuple = (short) this.r.nextInt();
-
-		final StormTuple<Short> tuple = new StormTuple<Short>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple, tuple.getShort(0));
-	}
-
-	@Test
-	public void testString() {
-		final byte[] data = new byte[this.r.nextInt(15)];
-		this.r.nextBytes(data);
-		final String flinkTuple = new String(data);
-
-		final StormTuple<String> tuple = new StormTuple<String>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple, tuple.getString(0));
-	}
-
-	@Test
-	public void testBinaryTuple() {
-		final byte[] data = new byte[this.r.nextInt(15)];
-		this.r.nextBytes(data);
-
-		final int index = this.r.nextInt(5);
-		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
-		flinkTuple.setField(data, index);
-
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
-	}
-
-	@Test
-	public void testBooleanTuple() {
-		final Boolean data = this.r.nextBoolean();
-
-		final int index = this.r.nextInt(5);
-		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
-		flinkTuple.setField(data, index);
-
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple.getField(index), tuple.getBoolean(index));
-	}
-
-	@Test
-	public void testByteTuple() {
-		final Byte data = (byte) this.r.nextInt();
-
-		final int index = this.r.nextInt(5);
-		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
-		flinkTuple.setField(data, index);
-
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple.getField(index), tuple.getByte(index));
-	}
-
-	@Test
-	public void testDoubleTuple() {
-		final Double data = this.r.nextDouble();
-
-		final int index = this.r.nextInt(5);
-		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
-		flinkTuple.setField(data, index);
-
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple.getField(index), tuple.getDouble(index));
-	}
-
-	@Test
-	public void testFloatTuple() {
-		final Float data = this.r.nextFloat();
-
-		final int index = this.r.nextInt(5);
-		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
-		flinkTuple.setField(data, index);
-
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple.getField(index), tuple.getFloat(index));
-	}
-
-	@Test
-	public void testIntegerTuple() {
-		final Integer data = this.r.nextInt();
-
-		final int index = this.r.nextInt(5);
-		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
-		flinkTuple.setField(data, index);
-
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple.getField(index), tuple.getInteger(index));
-	}
-
-	@Test
-	public void testLongTuple() {
-		final Long data = this.r.nextLong();
-
-		final int index = this.r.nextInt(5);
-		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
-		flinkTuple.setField(data, index);
-
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple.getField(index), tuple.getLong(index));
-	}
-
-	@Test
-	public void testShortTuple() {
-		final Short data = (short) this.r.nextInt();
-
-		final int index = this.r.nextInt(5);
-		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
-		flinkTuple.setField(data, index);
-
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple.getField(index), tuple.getShort(index));
-	}
-
-	@Test
-	public void testStringTuple() {
-		final byte[] rawdata = new byte[this.r.nextInt(15)];
-		this.r.nextBytes(rawdata);
-		final String data = new String(rawdata);
-
-		final int index = this.r.nextInt(5);
-		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
-		flinkTuple.setField(data, index);
-
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
-		Assert.assertEquals(flinkTuple.getField(index), tuple.getString(index));
-	}
-
-	@Test
-	public void testContains() throws Exception {
-		Fields schema = new Fields("a1", "a2");
-		StormTuple<Object> tuple = new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
-				schema);
-
-		Assert.assertTrue(tuple.contains("a1"));
-		Assert.assertTrue(tuple.contains("a2"));
-		Assert.assertFalse(tuple.contains("a3"));
-	}
-
-	@Test
-	public void testGetFields() throws Exception {
-		Fields schema = new Fields();
-
-		Assert.assertSame(schema, new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
-				schema).getFields());
-
-		Assert.assertSame(null, new StormTuple<Object>(null, schema).getFields());
-
-	}
-
-	@Test
-	public void testFieldIndex() throws Exception {
-		Fields schema = new Fields("a1", "a2");
-		StormTuple<Object> tuple = new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
-				schema);
-
-		Assert.assertEquals(0, tuple.fieldIndex("a1"));
-		Assert.assertEquals(1, tuple.fieldIndex("a2"));
-	}
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void testSelect() throws Exception {
-		Tuple tuple = Tuple.getTupleClass(arity).newInstance();
-		Values values = new Values();
-
-		ArrayList<String> attributeNames = new ArrayList<String>(arity);
-		ArrayList<String> filterNames = new ArrayList<String>(arity);
-
-		for (int i = 0; i < arity; ++i) {
-			tuple.setField(i, i);
-			attributeNames.add("a" + i);
-
-			if (r.nextBoolean()) {
-				filterNames.add("a" + i);
-				values.add(i);
-			}
-		}
-		Fields schema = new Fields(attributeNames);
-		Fields selector = new Fields(filterNames);
-
-		Assert.assertEquals(values, new StormTuple(tuple, schema).select(selector));
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Test
-	public void testGetValueByField() throws Exception {
-		Object value = mock(Object.class);
-		StormTuple tuple = testGetByField(arity, index, value);
-		Assert.assertSame(value, tuple.getValueByField(fieldName));
-
-	}
-
-	@Test
-	public void testGetValueByFieldPojo() throws Exception {
-		Object value = mock(Object.class);
-		TestPojoMember<Object> pojo = new TestPojoMember<Object>(value);
-		StormTuple<TestPojoMember<Object>> tuple = new StormTuple<TestPojoMember<Object>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getValueByField(fieldNamePojo));
-	}
-
-	@Test
-	public void testGetValueByFieldPojoGetter() throws Exception {
-		Object value = mock(Object.class);
-		TestPojoGetter<Object> pojo = new TestPojoGetter<Object>(value);
-		StormTuple<TestPojoGetter<Object>> tuple = new StormTuple<TestPojoGetter<Object>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getValueByField(fieldNamePojo));
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Test
-	public void testGetStringByField() throws Exception {
-		String value = "stringValue";
-		StormTuple tuple = testGetByField(arity, index, value);
-		Assert.assertSame(value, tuple.getStringByField(fieldName));
-	}
-
-	@Test
-	public void testGetStringByFieldPojo() throws Exception {
-		String value = "stringValue";
-		TestPojoMember<String> pojo = new TestPojoMember<String>(value);
-		StormTuple<TestPojoMember<String>> tuple = new StormTuple<TestPojoMember<String>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getStringByField(fieldNamePojo));
-	}
-
-	@Test
-	public void testGetStringByFieldPojoGetter() throws Exception {
-		String value = "stringValue";
-		TestPojoGetter<String> pojo = new TestPojoGetter<String>(value);
-		StormTuple<TestPojoGetter<String>> tuple = new StormTuple<TestPojoGetter<String>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getStringByField(fieldNamePojo));
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Test
-	public void testGetIntegerByField() throws Exception {
-		Integer value = r.nextInt();
-		StormTuple tuple = testGetByField(arity, index, value);
-		Assert.assertSame(value, tuple.getIntegerByField(fieldName));
-	}
-
-	@Test
-	public void testGetIntegerByFieldPojo() throws Exception {
-		Integer value = r.nextInt();
-		TestPojoMember<Integer> pojo = new TestPojoMember<Integer>(value);
-		StormTuple<TestPojoMember<Integer>> tuple = new StormTuple<TestPojoMember<Integer>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getIntegerByField(fieldNamePojo));
-	}
-
-	@Test
-	public void testGetIntegerByFieldPojoGetter() throws Exception {
-		Integer value = r.nextInt();
-		TestPojoGetter<Integer> pojo = new TestPojoGetter<Integer>(value);
-		StormTuple<TestPojoGetter<Integer>> tuple = new StormTuple<TestPojoGetter<Integer>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getIntegerByField(fieldNamePojo));
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Test
-	public void testGetLongByField() throws Exception {
-		Long value = r.nextLong();
-		StormTuple tuple = testGetByField(arity, index, value);
-		Assert.assertSame(value, tuple.getLongByField(fieldName));
-	}
-
-	@Test
-	public void testGetLongByFieldPojo() throws Exception {
-		Long value = r.nextLong();
-		TestPojoMember<Long> pojo = new TestPojoMember<Long>(value);
-		StormTuple<TestPojoMember<Long>> tuple = new StormTuple<TestPojoMember<Long>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getLongByField(fieldNamePojo));
-	}
-
-	@Test
-	public void testGetLongByFieldPojoGetter() throws Exception {
-		Long value = r.nextLong();
-		TestPojoGetter<Long> pojo = new TestPojoGetter<Long>(value);
-		StormTuple<TestPojoGetter<Long>> tuple = new StormTuple<TestPojoGetter<Long>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getLongByField(fieldNamePojo));
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Test
-	public void testGetBooleanByField() throws Exception {
-		Boolean value = r.nextBoolean();
-		StormTuple tuple = testGetByField(arity, index, value);
-		Assert.assertEquals(value, tuple.getBooleanByField(fieldName));
-	}
-
-	@Test
-	public void testGetBooleanByFieldPojo() throws Exception {
-		Boolean value = r.nextBoolean();
-		TestPojoMember<Boolean> pojo = new TestPojoMember<Boolean>(value);
-		StormTuple<TestPojoMember<Boolean>> tuple = new StormTuple<TestPojoMember<Boolean>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getBooleanByField(fieldNamePojo));
-	}
-
-	@Test
-	public void testGetBooleanByFieldPojoGetter() throws Exception {
-		Boolean value = r.nextBoolean();
-		TestPojoGetter<Boolean> pojo = new TestPojoGetter<Boolean>(value);
-		StormTuple<TestPojoGetter<Boolean>> tuple = new StormTuple<TestPojoGetter<Boolean>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getBooleanByField(fieldNamePojo));
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Test
-	public void testGetShortByField() throws Exception {
-		Short value = (short) r.nextInt();
-		StormTuple tuple = testGetByField(arity, index, value);
-		Assert.assertSame(value, tuple.getShortByField(fieldName));
-	}
-
-	@Test
-	public void testGetShortByFieldPojo() throws Exception {
-		Short value = (short) r.nextInt();
-		TestPojoMember<Short> pojo = new TestPojoMember<Short>(value);
-		StormTuple<TestPojoMember<Short>> tuple = new StormTuple<TestPojoMember<Short>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getShortByField(fieldNamePojo));
-	}
-
-	@Test
-	public void testGetShortByFieldPojoGetter() throws Exception {
-		Short value = (short) r.nextInt();
-		TestPojoGetter<Short> pojo = new TestPojoGetter<Short>(value);
-		StormTuple<TestPojoGetter<Short>> tuple = new StormTuple<TestPojoGetter<Short>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getShortByField(fieldNamePojo));
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Test
-	public void testGetByteByField() throws Exception {
-		Byte value = new Byte((byte) r.nextInt());
-		StormTuple tuple = testGetByField(arity, index, value);
-		Assert.assertSame(value, tuple.getByteByField(fieldName));
-	}
-
-	@Test
-	public void testGetByteByFieldPojo() throws Exception {
-		Byte value = new Byte((byte) r.nextInt());
-		TestPojoMember<Byte> pojo = new TestPojoMember<Byte>(value);
-		StormTuple<TestPojoMember<Byte>> tuple = new StormTuple<TestPojoMember<Byte>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getByteByField(fieldNamePojo));
-	}
-
-	@Test
-	public void testGetByteByFieldPojoGetter() throws Exception {
-		Byte value = new Byte((byte) r.nextInt());
-		TestPojoGetter<Byte> pojo = new TestPojoGetter<Byte>(value);
-		StormTuple<TestPojoGetter<Byte>> tuple = new StormTuple<TestPojoGetter<Byte>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getByteByField(fieldNamePojo));
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Test
-	public void testGetDoubleByField() throws Exception {
-		Double value = r.nextDouble();
-		StormTuple tuple = testGetByField(arity, index, value);
-		Assert.assertSame(value, tuple.getDoubleByField(fieldName));
-	}
-
-	@Test
-	public void testGetDoubleByFieldPojo() throws Exception {
-		Double value = r.nextDouble();
-		TestPojoMember<Double> pojo = new TestPojoMember<Double>(value);
-		StormTuple<TestPojoMember<Double>> tuple = new StormTuple<TestPojoMember<Double>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo));
-	}
-
-	@Test
-	public void testGetDoubleByFieldPojoGetter() throws Exception {
-		Double value = r.nextDouble();
-		TestPojoGetter<Double> pojo = new TestPojoGetter<Double>(value);
-		StormTuple<TestPojoGetter<Double>> tuple = new StormTuple<TestPojoGetter<Double>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo));
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Test
-	public void testGetFloatByField() throws Exception {
-		Float value = r.nextFloat();
-		StormTuple tuple = testGetByField(arity, index, value);
-		Assert.assertSame(value, tuple.getFloatByField(fieldName));
-	}
-
-	@Test
-	public void testGetFloatByFieldPojo() throws Exception {
-		Float value = r.nextFloat();
-		TestPojoMember<Float> pojo = new TestPojoMember<Float>(value);
-		StormTuple<TestPojoMember<Float>> tuple = new StormTuple<TestPojoMember<Float>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo));
-	}
-
-	@Test
-	public void testGetFloatByFieldPojoGetter() throws Exception {
-		Float value = r.nextFloat();
-		TestPojoGetter<Float> pojo = new TestPojoGetter<Float>(value);
-		StormTuple<TestPojoGetter<Float>> tuple = new StormTuple<TestPojoGetter<Float>>(pojo,
-				null);
-		Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo));
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Test
-	public void testGetBinaryByField() throws Exception {
-		byte[] data = new byte[1 + r.nextInt(20)];
-		r.nextBytes(data);
-		StormTuple tuple = testGetByField(arity, index, data);
-		Assert.assertSame(data, tuple.getBinaryByField(fieldName));
-	}
-
-	@Test
-	public void testGetBinaryFieldPojo() throws Exception {
-		byte[] data = new byte[1 + r.nextInt(20)];
-		r.nextBytes(data);
-		TestPojoMember<byte[]> pojo = new TestPojoMember<byte[]>(data);
-		StormTuple<TestPojoMember<byte[]>> tuple = new StormTuple<TestPojoMember<byte[]>>(pojo,
-				null);
-		Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo));
-	}
-
-	@Test
-	public void testGetBinaryByFieldPojoGetter() throws Exception {
-		byte[] data = new byte[1 + r.nextInt(20)];
-		r.nextBytes(data);
-		TestPojoGetter<byte[]> pojo = new TestPojoGetter<byte[]>(data);
-		StormTuple<TestPojoGetter<byte[]>> tuple = new StormTuple<TestPojoGetter<byte[]>>(pojo,
-				null);
-		Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo));
-	}
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	private <T> StormTuple testGetByField(int arity, int index, T value)
-			throws Exception {
-
-		assert (index < arity);
-
-		Tuple tuple = Tuple.getTupleClass(arity).newInstance();
-		tuple.setField(value, index);
-
-		ArrayList<String> attributeNames = new ArrayList<String>(arity);
-		for(int i = 0; i < arity; ++i) {
-			if(i == index) {
-				attributeNames.add(fieldName);
-			} else {
-				attributeNames.add("" + i);
-			}
-		}
-		Fields schema = new Fields(attributeNames);
-
-		return new StormTuple(tuple, schema);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetSourceGlobalStreamid() {
-		new StormTuple<Object>(null, null).getSourceGlobalStreamid();
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetSourceComponent() {
-		new StormTuple<Object>(null, null).getSourceComponent();
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetSourceTask() {
-		new StormTuple<Object>(null, null).getSourceTask();
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetSourceStreamId() {
-		new StormTuple<Object>(null, null).getSourceStreamId();
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetMessageId() {
-		new StormTuple<Object>(null, null).getMessageId();
-	}
-
-	public static class TestPojoMember<T> {
-		public T member;
-
-		public TestPojoMember(T value) {
-			this.member = value;
-		}
-	}
-
-	public static class TestPojoGetter<T> {
-		private T member;
-
-		public TestPojoGetter(T value) {
-			this.member = value;
-		}
-
-		public T getMember() {
-			return this.member;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
deleted file mode 100644
index c799d63..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.stormcompatibility.api.TestTopologyBuilder;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.apache.flink.stormcompatibility.util.TestDummyBolt;
-import org.apache.flink.stormcompatibility.util.TestDummySpout;
-import org.apache.flink.stormcompatibility.util.TestSink;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import com.google.common.collect.Sets;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@PowerMockIgnore("javax.*")
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(StormWrapperSetupHelper.class)
-public class StormWrapperSetupHelperTest extends AbstractTest {
-
-	@Test
-	public void testEmptyDeclarerBolt() {
-		IComponent boltOrSpout;
-
-		if (this.r.nextBoolean()) {
-			boltOrSpout = mock(IRichSpout.class);
-		} else {
-			boltOrSpout = mock(IRichBolt.class);
-		}
-
-		Assert.assertEquals(new HashMap<String, Integer>(),
-				StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, null));
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testRawType() throws Exception {
-		IComponent boltOrSpout;
-
-		if (this.r.nextBoolean()) {
-			boltOrSpout = mock(IRichSpout.class);
-		} else {
-			boltOrSpout = mock(IRichBolt.class);
-		}
-
-		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
-		declarer.declare(new Fields("dummy1", "dummy2"));
-		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
-		StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
-				Sets.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }));
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testToManyAttributes() throws Exception {
-		IComponent boltOrSpout;
-
-		if (this.r.nextBoolean()) {
-			boltOrSpout = mock(IRichSpout.class);
-		} else {
-			boltOrSpout = mock(IRichBolt.class);
-		}
-
-		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
-		final String[] schema = new String[26];
-		for (int i = 0; i < schema.length; ++i) {
-			schema[i] = "a" + i;
-		}
-		declarer.declare(new Fields(schema));
-		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
-		StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, null);
-	}
-
-	@Test
-	public void testTupleTypes() throws Exception {
-		for (int i = -1; i < 26; ++i) {
-			this.testTupleTypes(i);
-		}
-	}
-
-	private void testTupleTypes(final int numberOfAttributes) throws Exception {
-		String[] schema;
-		if (numberOfAttributes == -1) {
-			schema = new String[1];
-		} else {
-			schema = new String[numberOfAttributes];
-		}
-		for (int i = 0; i < schema.length; ++i) {
-			schema[i] = "a" + i;
-		}
-
-		IComponent boltOrSpout;
-		if (this.r.nextBoolean()) {
-			boltOrSpout = mock(IRichSpout.class);
-		} else {
-			boltOrSpout = mock(IRichBolt.class);
-		}
-
-		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
-		declarer.declare(new Fields(schema));
-		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
-		HashMap<String, Integer> attributes = new HashMap<String, Integer>();
-		attributes.put(Utils.DEFAULT_STREAM_ID, numberOfAttributes);
-
-		Assert.assertEquals(attributes, StormWrapperSetupHelper.getNumberOfAttributes(
-				boltOrSpout,
-				numberOfAttributes == -1 ? Sets
-						.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }) : null));
-	}
-
-	@Test
-	public void testCreateTopologyContext() {
-		HashMap<String, Integer> dops = new HashMap<String, Integer>();
-		dops.put("spout1", 1);
-		dops.put("spout2", 3);
-		dops.put("bolt1", 1);
-		dops.put("bolt2", 2);
-		dops.put("sink", 1);
-
-		HashMap<String, Integer> taskCounter = new HashMap<String, Integer>();
-		taskCounter.put("spout1", 0);
-		taskCounter.put("spout2", 0);
-		taskCounter.put("bolt1", 0);
-		taskCounter.put("bolt2", 0);
-		taskCounter.put("sink", 0);
-
-		HashMap<String, IComponent> operators = new HashMap<String, IComponent>();
-		operators.put("spout1", new TestDummySpout());
-		operators.put("spout2", new TestDummySpout());
-		operators.put("bolt1", new TestDummyBolt());
-		operators.put("bolt2", new TestDummyBolt());
-		operators.put("sink", new TestSink());
-
-		TopologyBuilder builder = new TopologyBuilder();
-
-		builder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1"));
-		builder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2"));
-		builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
-		builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
-		builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
-		.fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
-		.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
-		.fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
-		.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
-
-		final int maxRetry = 3;
-		int counter;
-		for (counter = 0; counter < maxRetry; ++counter) {
-			LocalCluster cluster = new LocalCluster();
-			Config c = new Config();
-			c.setNumAckers(0);
-			cluster.submitTopology("test", c, builder.createTopology());
-			Utils.sleep((counter + 1) * 5000);
-			cluster.shutdown();
-
-			if (TestSink.result.size() == 8) {
-				break;
-			}
-		}
-		Assert.assertTrue(counter < maxRetry);
-
-		TestTopologyBuilder flinkBuilder = new TestTopologyBuilder();
-
-		flinkBuilder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1"));
-		flinkBuilder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2"));
-		flinkBuilder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
-		flinkBuilder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
-		flinkBuilder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
-		.fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
-		.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
-		.fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
-		.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
-
-		flinkBuilder.createTopology();
-		StormTopology stormTopology = flinkBuilder.getStormTopology();
-
-		Set<Integer> taskIds = new HashSet<Integer>();
-
-		for (TopologyContext expectedContext : TestSink.result) {
-			final String thisComponentId = expectedContext.getThisComponentId();
-			int index = taskCounter.get(thisComponentId);
-
-			StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
-			when(context.getTaskName()).thenReturn(thisComponentId);
-			when(context.getNumberOfParallelSubtasks()).thenReturn(dops.get(thisComponentId));
-			when(context.getIndexOfThisSubtask()).thenReturn(index);
-			taskCounter.put(thisComponentId, ++index);
-
-			Config stormConfig = new Config();
-			stormConfig.put(StormWrapperSetupHelper.TOPOLOGY_NAME, "test");
-
-			TopologyContext topologyContext = StormWrapperSetupHelper.createTopologyContext(
-					context, operators.get(thisComponentId), stormTopology, stormConfig);
-
-			ComponentCommon expcetedCommon = expectedContext.getComponentCommon(thisComponentId);
-			ComponentCommon common = topologyContext.getComponentCommon(thisComponentId);
-
-			Assert.assertNull(topologyContext.getCodeDir());
-			Assert.assertNull(common.get_json_conf());
-			Assert.assertNull(topologyContext.getExecutorData(null));
-			Assert.assertNull(topologyContext.getPIDDir());
-			Assert.assertNull(topologyContext.getResource(null));
-			Assert.assertNull(topologyContext.getSharedExecutor());
-			Assert.assertNull(expectedContext.getTaskData(null));
-			Assert.assertNull(topologyContext.getThisWorkerPort());
-
-			Assert.assertTrue(expectedContext.getStormId().startsWith(topologyContext.getStormId()));
-
-			Assert.assertEquals(expcetedCommon.get_inputs(), common.get_inputs());
-			Assert.assertEquals(expcetedCommon.get_parallelism_hint(), common.get_parallelism_hint());
-			Assert.assertEquals(expcetedCommon.get_streams(), common.get_streams());
-			Assert.assertEquals(expectedContext.getComponentIds(), topologyContext.getComponentIds());
-			Assert.assertEquals(expectedContext.getComponentStreams(thisComponentId),
-					topologyContext.getComponentStreams(thisComponentId));
-			Assert.assertEquals(thisComponentId, topologyContext.getThisComponentId());
-			Assert.assertEquals(expectedContext.getThisSources(), topologyContext.getThisSources());
-			Assert.assertEquals(expectedContext.getThisStreams(), topologyContext.getThisStreams());
-			Assert.assertEquals(expectedContext.getThisTargets(), topologyContext.getThisTargets());
-			Assert.assertEquals(0, topologyContext.getThisWorkerTasks().size());
-
-			for (int taskId : topologyContext.getComponentTasks(thisComponentId)) {
-				Assert.assertEquals(thisComponentId, topologyContext.getComponentId(taskId));
-			}
-
-			for (String componentId : expectedContext.getComponentIds()) {
-				Assert.assertEquals(expectedContext.getSources(componentId),
-						topologyContext.getSources(componentId));
-				Assert.assertEquals(expectedContext.getTargets(componentId),
-						topologyContext.getTargets(componentId));
-
-				for (String streamId : expectedContext.getComponentStreams(componentId)) {
-					Assert.assertEquals(
-							expectedContext.getComponentOutputFields(componentId, streamId).toList(),
-							topologyContext.getComponentOutputFields(componentId, streamId).toList());
-				}
-			}
-
-			for (String streamId : expectedContext.getThisStreams()) {
-				Assert.assertEquals(expectedContext.getThisOutputFields(streamId).toList(),
-						topologyContext.getThisOutputFields(streamId).toList());
-			}
-
-			HashMap<Integer, String> taskToComponents = new HashMap<Integer, String>();
-			Set<Integer> allTaskIds = new HashSet<Integer>();
-			for (String componentId : expectedContext.getComponentIds()) {
-				List<Integer> possibleTasks = expectedContext.getComponentTasks(componentId);
-				List<Integer> tasks = topologyContext.getComponentTasks(componentId);
-
-				Iterator<Integer> p_it = possibleTasks.iterator();
-				Iterator<Integer> t_it = tasks.iterator();
-				while(p_it.hasNext()) {
-					Assert.assertTrue(t_it.hasNext());
-					Assert.assertNull(taskToComponents.put(p_it.next(), componentId));
-					Assert.assertTrue(allTaskIds.add(t_it.next()));
-				}
-				Assert.assertFalse(t_it.hasNext());
-			}
-
-			Assert.assertEquals(taskToComponents, expectedContext.getTaskToComponent());
-			Assert.assertTrue(taskIds.add(topologyContext.getThisTaskId()));
-
-			try {
-				topologyContext.getHooks();
-				Assert.fail();
-			} catch (UnsupportedOperationException e) { /* expected */ }
-
-			try {
-				topologyContext.getRegisteredMetricByName(null);
-				Assert.fail();
-			} catch (UnsupportedOperationException e) { /* expected */ }
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
deleted file mode 100644
index 7c91e6f..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-import java.util.LinkedList;
-
-class TestContext implements SourceContext<Tuple1<Integer>> {
-	public LinkedList<Tuple1<Integer>> result = new LinkedList<Tuple1<Integer>>();
-
-	public TestContext() {
-	}
-
-	@Override
-	public void collect(final Tuple1<Integer> record) {
-		this.result.add(record.copy());
-	}
-
-	@Override
-	public void collectWithTimestamp(Tuple1<Integer> element, long timestamp) {
-		this.result.add(element.copy());
-	}
-
-	@Override
-	public void emitWatermark(Watermark mark) {
-		// ignore it
-	}
-
-	@Override
-	public Object getCheckpointLock() {
-		return null;
-	}
-
-	@Override
-	public void close() {
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j-test.properties b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j-test.properties
deleted file mode 100644
index 0b686e5..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j.properties b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j.properties
deleted file mode 100644
index ed2bbcb..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=OFF, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target = System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/logback-test.xml b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/logback-test.xml
deleted file mode 100644
index 4f56748..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
deleted file mode 100644
index 6290df2..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
+++ /dev/null
@@ -1,20 +0,0 @@
-# flink-storm-examples
-
-This module contains multiple versions of a simple word-count-example to illustrate the usage of the compatibility layer:
-* the usage of spouts or bolt within a regular Flink streaming program (ie, embedded spouts or bolts)
-   1. `SpoutSourceWordCount` uses a spout as data source within a Flink streaming program
-   2. `BoltTokenizeerWordCount` uses a bolt to split sentences into words within a Flink streaming program
-      * `BoltTokenizeerWordCountWithNames` used Tuple input type and access attributes by field names (rather than index)
-      * `BoltTokenizeerWordCountPOJO` used POJO input type and access attributes by field names (rather then index)
-
-* how to submit a whole Storm topology to Flink
-   3. `WordCountTopology` plugs a Storm topology together
-      * `StormWordCountLocal` submits the topology to a local Flink cluster (similiar to a `LocalCluster` in Storm)
-        (`StormWordCountNamedLocal` access attributes by field names rather than index)
-      * `StormWordCountRemoteByClient` submits the topology to a remote Flink cluster (simliar to the usage of `NimbusClient` in Storm)
-      * `StormWordCountRemoteBySubmitter` submits the topology to a remote Flink cluster (simliar to the usage of `StormSubmitter` in Storm)
-
-Additionally, this module package the three examples word-count programs as jar files to be submitted to a Flink cluster via `bin/flink run example.jar`.
-(Valid jars are `WordCount-SpoutSource.jar`, `WordCount-BoltTokenizer.jar`, and `WordCount-StormTopology.jar`)
-
-The package `org.apache.flink.stormcompatiblitly.wordcount.stormoperators` contain original Storm spouts and bolts that can be used unmodified within Storm or Flink.