You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:35 UTC
[10/15] flink git commit: [Storm Compatibility] Maven module
restucturing and cleanup - removed storm-parent;
renamed storm-core and storm-examples - updated internal Java package
structure * renamed package "stormcompatibility" to "storm" *
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
deleted file mode 100644
index 6817593..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.HashSet;
-import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.same;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({StreamRecordSerializer.class, StormWrapperSetupHelper.class})
-public class StormBoltWrapperTest extends AbstractTest {
-
- @Test(expected = IllegalArgumentException.class)
- public void testWrapperRawType() throws Exception {
- final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
- declarer.declare(new Fields("dummy1", "dummy2"));
- PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
- new StormBoltWrapper<Object, Object>(mock(IRichBolt.class),
- new String[] { Utils.DEFAULT_STREAM_ID });
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testWrapperToManyAttributes1() throws Exception {
- final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
- final String[] schema = new String[26];
- for (int i = 0; i < schema.length; ++i) {
- schema[i] = "a" + i;
- }
- declarer.declare(new Fields(schema));
- PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
- new StormBoltWrapper<Object, Object>(mock(IRichBolt.class));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testWrapperToManyAttributes2() throws Exception {
- final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
- final String[] schema = new String[26];
- for (int i = 0; i < schema.length; ++i) {
- schema[i] = "a" + i;
- }
- declarer.declare(new Fields(schema));
- PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
- new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), new String[] {});
- }
-
- @Test
- public void testWrapper() throws Exception {
- for (int i = -1; i < 26; ++i) {
- this.testWrapper(i);
- }
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private void testWrapper(final int numberOfAttributes) throws Exception {
- assert ((-1 <= numberOfAttributes) && (numberOfAttributes <= 25));
- Tuple flinkTuple = null;
- String rawTuple = null;
-
- if (numberOfAttributes == -1) {
- rawTuple = "test";
- } else {
- flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
- }
-
- final String[] schema;
- if (numberOfAttributes == -1) {
- schema = new String[1];
- } else {
- schema = new String[numberOfAttributes];
- }
- for (int i = 0; i < schema.length; ++i) {
- schema[i] = "a" + i;
- }
-
- final StreamRecord record = mock(StreamRecord.class);
- if (numberOfAttributes == -1) {
- when(record.getValue()).thenReturn(rawTuple);
- } else {
- when(record.getValue()).thenReturn(flinkTuple);
- }
-
- final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
- when(taskContext.getTaskName()).thenReturn("name");
-
- final IRichBolt bolt = mock(IRichBolt.class);
-
- final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
- declarer.declare(new Fields(schema));
- PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
- final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, (Fields) null);
- wrapper.setup(mock(Output.class), taskContext);
- wrapper.open(null);
-
- wrapper.processElement(record);
- if (numberOfAttributes == -1) {
- verify(bolt).execute(eq(new StormTuple<String>(rawTuple, null)));
- } else {
- verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple, null)));
- }
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testMultipleOutputStreams() throws Exception {
- final boolean rawOutType1 = super.r.nextBoolean();
- final boolean rawOutType2 = super.r.nextBoolean();
-
- final StreamRecord record = mock(StreamRecord.class);
- when(record.getValue()).thenReturn(2).thenReturn(3);
-
- final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
- when(taskContext.getTaskName()).thenReturn("name");
-
- final Output output = mock(Output.class);
-
- final TestBolt bolt = new TestBolt();
- final HashSet<String> raw = new HashSet<String>();
- if (rawOutType1) {
- raw.add("stream1");
- }
- if (rawOutType2) {
- raw.add("stream2");
- }
-
- final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, (Fields) null, raw);
- wrapper.setup(output, taskContext);
- wrapper.open(null);
-
- final SplitStreamType splitRecord = new SplitStreamType<Integer>();
- if (rawOutType1) {
- splitRecord.streamId = "stream1";
- splitRecord.value = 2;
- } else {
- splitRecord.streamId = "stream1";
- splitRecord.value = new Tuple1<Integer>(2);
- }
- wrapper.processElement(record);
- verify(output).collect(new StreamRecord<SplitStreamType>(splitRecord, 0));
-
- if (rawOutType2) {
- splitRecord.streamId = "stream2";
- splitRecord.value = 3;
- } else {
- splitRecord.streamId = "stream2";
- splitRecord.value = new Tuple1<Integer>(3);
- }
- wrapper.processElement(record);
- verify(output, times(2)).collect(new StreamRecord<SplitStreamType>(splitRecord, 0));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testOpen() throws Exception {
- final StormConfig stormConfig = new StormConfig();
- final Configuration flinkConfig = new Configuration();
-
- final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
- when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
- .thenReturn(flinkConfig);
-
- final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
- when(taskContext.getTaskName()).thenReturn("name");
-
- final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
- declarer.declare(new Fields("dummy"));
- PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
-
- final IRichBolt bolt = mock(IRichBolt.class);
- final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
- wrapper.setup(mock(Output.class), taskContext);
-
- // test without configuration
- wrapper.open(null);
- verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
-
- // test with StormConfig
- wrapper.open(null);
- verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
- any(OutputCollector.class));
-
- // test with Configuration
- wrapper.open(null);
- verify(bolt, times(3)).prepare(eq(flinkConfig.toMap()), any(TopologyContext.class),
- any(OutputCollector.class));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testOpenSink() throws Exception {
- final StormConfig stormConfig = new StormConfig();
- final Configuration flinkConfig = new Configuration();
-
- final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
- when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
- .thenReturn(flinkConfig);
-
- final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
- when(taskContext.getTaskName()).thenReturn("name");
-
- final IRichBolt bolt = mock(IRichBolt.class);
-
- final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
- wrapper.setup(mock(Output.class), taskContext);
-
- // test without configuration
- wrapper.open(null);
- verify(bolt).prepare(any(Map.class), any(TopologyContext.class),
- isNull(OutputCollector.class));
-
- // test with StormConfig
- wrapper.open(null);
- verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
- isNull(OutputCollector.class));
-
- // test with Configuration
- wrapper.open(null);
- verify(bolt, times(3)).prepare(eq(flinkConfig.toMap()), any(TopologyContext.class),
- isNull(OutputCollector.class));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testClose() throws Exception {
- final IRichBolt bolt = mock(IRichBolt.class);
-
- final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
- declarer.declare(new Fields("dummy"));
- PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
- final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
-
- final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- wrapper.setup(mock(Output.class), taskContext);
-
- wrapper.close();
- wrapper.dispose();
-
- verify(bolt).cleanup();
- }
-
- private static final class TestBolt implements IRichBolt {
- private static final long serialVersionUID = 7278692872260138758L;
- private OutputCollector collector;
-
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- int counter = 0;
- @Override
- public void execute(backtype.storm.tuple.Tuple input) {
- if (++counter % 2 == 1) {
- this.collector.emit("stream1", new Values(input.getInteger(0)));
- } else {
- this.collector.emit("stream2", new Values(input.getInteger(0)));
- }
- }
-
- @Override
- public void cleanup() {}
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declareStream("stream1", new Fields("a1"));
- declarer.declareStream("stream2", new Fields("a2"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
deleted file mode 100644
index 77f1b05..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.tuple.Fields;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.apache.flink.stormcompatibility.util.FiniteTestSpout;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.LinkedList;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(StormWrapperSetupHelper.class)
-public class StormFiniteSpoutWrapperTest extends AbstractTest {
-
- @SuppressWarnings("unchecked")
- @Test
- public void testRunExecuteFixedNumber() throws Exception {
- final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
- declarer.declare(new Fields("dummy"));
- PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
- final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
- when(taskContext.getTaskName()).thenReturn("name");
-
- final IRichSpout spout = mock(IRichSpout.class);
- final int numberOfCalls = this.r.nextInt(50);
- final StormFiniteSpoutWrapper<?> spoutWrapper = new StormFiniteSpoutWrapper<Object>(spout, numberOfCalls);
- spoutWrapper.setRuntimeContext(taskContext);
-
- spoutWrapper.run(mock(SourceContext.class));
- verify(spout, times(numberOfCalls)).nextTuple();
- }
-
- @Test
- public void testRunExecute() throws Exception {
- final int numberOfCalls = this.r.nextInt(50);
-
- final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
- for (int i = numberOfCalls - 1; i >= 0; --i) {
- expectedResult.add(new Tuple1<Integer>(new Integer(i)));
- }
-
- final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
- when(taskContext.getTaskName()).thenReturn("name");
-
- final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
- final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
- spout);
- spoutWrapper.setRuntimeContext(taskContext);
-
- final TestContext collector = new TestContext();
- spoutWrapper.run(collector);
-
- Assert.assertEquals(expectedResult, collector.result);
- }
-
- @Test
- public void testCancel() throws Exception {
- final int numberOfCalls = 5 + this.r.nextInt(5);
-
- final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
- expectedResult.add(new Tuple1<Integer>(new Integer(numberOfCalls - 1)));
-
- StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
- when(taskContext.getTaskName()).thenReturn("name");
-
- final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
- final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
- spout);
-
- spoutWrapper.cancel();
- final TestContext collector = new TestContext();
- spoutWrapper.run(collector);
-
- Assert.assertEquals(expectedResult, collector.result);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
deleted file mode 100644
index 36ed58a..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.tuple.Values;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-public class StormSpoutCollectorTest extends AbstractTest {
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testSpoutStormCollector() throws InstantiationException, IllegalAccessException {
- for (int numberOfAttributes = -1; numberOfAttributes < 26; ++numberOfAttributes) {
- final SourceContext flinkCollector = mock(SourceContext.class);
- Tuple flinkTuple = null;
- final Values tuple = new Values();
-
- StormSpoutCollector<?> collector;
-
- final String streamId = "streamId";
- HashMap<String, Integer> attributes = new HashMap<String, Integer>();
- attributes.put(streamId, numberOfAttributes);
-
- if (numberOfAttributes == -1) {
- collector = new StormSpoutCollector(attributes, flinkCollector);
- tuple.add(new Integer(this.r.nextInt()));
-
- } else {
- collector = new StormSpoutCollector(attributes, flinkCollector);
- flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
-
- for (int i = 0; i < numberOfAttributes; ++i) {
- tuple.add(new Integer(this.r.nextInt()));
- flinkTuple.setField(tuple.get(i), i);
- }
- }
-
- final List<Integer> taskIds;
- final Object messageId = new Integer(this.r.nextInt());
-
- taskIds = collector.emit(streamId, tuple, messageId);
-
- Assert.assertNull(taskIds);
-
- if (numberOfAttributes == -1) {
- verify(flinkCollector).collect(tuple.get(0));
- } else {
- verify(flinkCollector).collect(flinkTuple);
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test(expected = UnsupportedOperationException.class)
- public void testReportError() {
- new StormSpoutCollector<Object>(mock(HashMap.class), mock(SourceContext.class))
- .reportError(null);
- }
-
- @SuppressWarnings("unchecked")
- @Test(expected = UnsupportedOperationException.class)
- public void testEmitDirect() {
- new StormSpoutCollector<Object>(mock(HashMap.class), mock(SourceContext.class)).emitDirect(
- 0, null, null,
- (Object) null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
deleted file mode 100644
index f4fb4da..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.apache.flink.stormcompatibility.util.FiniteTestSpout;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.LinkedList;
-import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(StormWrapperSetupHelper.class)
-public class StormSpoutWrapperTest extends AbstractTest {
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testRunPrepare() throws Exception {
- final StormConfig stormConfig = new StormConfig();
- final Configuration flinkConfig = new Configuration();
-
- final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
- when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
- .thenReturn(flinkConfig);
-
- final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
-
- final IRichSpout spout = mock(IRichSpout.class);
- final StormSpoutWrapper spoutWrapper = new StormSpoutWrapper(spout);
- spoutWrapper.setRuntimeContext(taskContext);
- spoutWrapper.isRunning = false;
-
- // test without configuration
- spoutWrapper.run(mock(SourceContext.class));
- verify(spout).open(any(Map.class), any(TopologyContext.class),
- any(SpoutOutputCollector.class));
-
- // test with StormConfig
- spoutWrapper.run(mock(SourceContext.class));
- verify(spout).open(same(stormConfig), any(TopologyContext.class),
- any(SpoutOutputCollector.class));
-
- // test with Configuration
- spoutWrapper.run(mock(SourceContext.class));
- verify(spout, times(3)).open(eq(flinkConfig.toMap()), any(TopologyContext.class),
- any(SpoutOutputCollector.class));
- }
-
- @Test
- public void testRunExecuteCancelInfinite() throws Exception {
- final int numberOfCalls = 5 + this.r.nextInt(5);
-
- final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
- when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
- when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
- when(taskContext.getTaskName()).thenReturn("name");
-
- final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
-
-
- final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
- spoutWrapper.setRuntimeContext(taskContext);
-
- spoutWrapper.cancel();
- final TestContext collector = new TestContext();
- spoutWrapper.run(collector);
-
- Assert.assertEquals(new LinkedList<Tuple1<Integer>>(), collector.result);
- }
-
- @Test
- public void testClose() throws Exception {
- final IRichSpout spout = mock(IRichSpout.class);
- final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
-
- spoutWrapper.close();
-
- verify(spout).close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
deleted file mode 100644
index 06d5399..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
+++ /dev/null
@@ -1,659 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-
-public class StormTupleTest extends AbstractTest {
- private static final String fieldName = "fieldName";
- private static final String fieldNamePojo = "member";
-
- private int arity, index;
-
- @Override
- @Before
- public void prepare() {
- super.prepare();
- this.arity = 1 + r.nextInt(25);
- this.index = r.nextInt(this.arity);
- }
-
- @Test
- public void nonTupleTest() {
- final Object flinkTuple = this.r.nextInt();
-
- final StormTuple<Object> tuple = new StormTuple<Object>(flinkTuple, null);
- Assert.assertSame(flinkTuple, tuple.getValue(0));
-
- final List<Object> values = tuple.getValues();
- Assert.assertEquals(1, values.size());
- Assert.assertEquals(flinkTuple, values.get(0));
- }
-
- @Test
- public void tupleTest() throws InstantiationException, IllegalAccessException {
- final int numberOfAttributes = this.r.nextInt(26);
- final Object[] data = new Object[numberOfAttributes];
-
- final Tuple flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
- for (int i = 0; i < numberOfAttributes; ++i) {
- data[i] = this.r.nextInt();
- flinkTuple.setField(data[i], i);
- }
-
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
- final List<Object> values = tuple.getValues();
-
- Assert.assertEquals(numberOfAttributes, values.size());
- for (int i = 0; i < numberOfAttributes; ++i) {
- Assert.assertEquals(flinkTuple.getField(i), values.get(i));
- }
-
- Assert.assertEquals(numberOfAttributes, tuple.size());
- }
-
- @Test
- public void testBinary() {
- final byte[] data = new byte[this.r.nextInt(15)];
- this.r.nextBytes(data);
-
- final int index = this.r.nextInt(5);
- final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
- flinkTuple.setField(data, index);
-
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
- Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
- }
-
- @Test
- public void testBoolean() {
- final Boolean flinkTuple = this.r.nextBoolean();
-
- final StormTuple<Boolean> tuple = new StormTuple<Boolean>(flinkTuple, null);
- Assert.assertEquals(flinkTuple, tuple.getBoolean(0));
- }
-
- @Test
- public void testByte() {
- final Byte flinkTuple = (byte) this.r.nextInt();
-
- final StormTuple<Byte> tuple = new StormTuple<Byte>(flinkTuple, null);
- Assert.assertEquals(flinkTuple, tuple.getByte(0));
- }
-
- @Test
- public void testDouble() {
- final Double flinkTuple = this.r.nextDouble();
-
- final StormTuple<Double> tuple = new StormTuple<Double>(flinkTuple, null);
- Assert.assertEquals(flinkTuple, tuple.getDouble(0));
- }
-
- @Test
- public void testFloat() {
- final Float flinkTuple = this.r.nextFloat();
-
- final StormTuple<Float> tuple = new StormTuple<Float>(flinkTuple, null);
- Assert.assertEquals(flinkTuple, tuple.getFloat(0));
- }
-
- @Test
- public void testInteger() {
- final Integer flinkTuple = this.r.nextInt();
-
- final StormTuple<Integer> tuple = new StormTuple<Integer>(flinkTuple, null);
- Assert.assertEquals(flinkTuple, tuple.getInteger(0));
- }
-
- @Test
- public void testLong() {
- final Long flinkTuple = this.r.nextLong();
-
- final StormTuple<Long> tuple = new StormTuple<Long>(flinkTuple, null);
- Assert.assertEquals(flinkTuple, tuple.getLong(0));
- }
-
- @Test
- public void testShort() {
- final Short flinkTuple = (short) this.r.nextInt();
-
- final StormTuple<Short> tuple = new StormTuple<Short>(flinkTuple, null);
- Assert.assertEquals(flinkTuple, tuple.getShort(0));
- }
-
- @Test
- public void testString() {
- final byte[] data = new byte[this.r.nextInt(15)];
- this.r.nextBytes(data);
- final String flinkTuple = new String(data);
-
- final StormTuple<String> tuple = new StormTuple<String>(flinkTuple, null);
- Assert.assertEquals(flinkTuple, tuple.getString(0));
- }
-
- @Test
- public void testBinaryTuple() {
- final byte[] data = new byte[this.r.nextInt(15)];
- this.r.nextBytes(data);
-
- final int index = this.r.nextInt(5);
- final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
- flinkTuple.setField(data, index);
-
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
- Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
- }
-
- @Test
- public void testBooleanTuple() {
- final Boolean data = this.r.nextBoolean();
-
- final int index = this.r.nextInt(5);
- final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
- flinkTuple.setField(data, index);
-
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
- Assert.assertEquals(flinkTuple.getField(index), tuple.getBoolean(index));
- }
-
- @Test
- public void testByteTuple() {
- final Byte data = (byte) this.r.nextInt();
-
- final int index = this.r.nextInt(5);
- final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
- flinkTuple.setField(data, index);
-
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
- Assert.assertEquals(flinkTuple.getField(index), tuple.getByte(index));
- }
-
- @Test
- public void testDoubleTuple() {
- final Double data = this.r.nextDouble();
-
- final int index = this.r.nextInt(5);
- final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
- flinkTuple.setField(data, index);
-
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
- Assert.assertEquals(flinkTuple.getField(index), tuple.getDouble(index));
- }
-
- @Test
- public void testFloatTuple() {
- final Float data = this.r.nextFloat();
-
- final int index = this.r.nextInt(5);
- final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
- flinkTuple.setField(data, index);
-
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
- Assert.assertEquals(flinkTuple.getField(index), tuple.getFloat(index));
- }
-
- @Test
- public void testIntegerTuple() {
- final Integer data = this.r.nextInt();
-
- final int index = this.r.nextInt(5);
- final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
- flinkTuple.setField(data, index);
-
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
- Assert.assertEquals(flinkTuple.getField(index), tuple.getInteger(index));
- }
-
- @Test
- public void testLongTuple() {
- final Long data = this.r.nextLong();
-
- final int index = this.r.nextInt(5);
- final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
- flinkTuple.setField(data, index);
-
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
- Assert.assertEquals(flinkTuple.getField(index), tuple.getLong(index));
- }
-
- @Test
- public void testShortTuple() {
- final Short data = (short) this.r.nextInt();
-
- final int index = this.r.nextInt(5);
- final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
- flinkTuple.setField(data, index);
-
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
- Assert.assertEquals(flinkTuple.getField(index), tuple.getShort(index));
- }
-
- @Test
- public void testStringTuple() {
- final byte[] rawdata = new byte[this.r.nextInt(15)];
- this.r.nextBytes(rawdata);
- final String data = new String(rawdata);
-
- final int index = this.r.nextInt(5);
- final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
- flinkTuple.setField(data, index);
-
- final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
- Assert.assertEquals(flinkTuple.getField(index), tuple.getString(index));
- }
-
- @Test
- public void testContains() throws Exception {
- Fields schema = new Fields("a1", "a2");
- StormTuple<Object> tuple = new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
- schema);
-
- Assert.assertTrue(tuple.contains("a1"));
- Assert.assertTrue(tuple.contains("a2"));
- Assert.assertFalse(tuple.contains("a3"));
- }
-
- @Test
- public void testGetFields() throws Exception {
- Fields schema = new Fields();
-
- Assert.assertSame(schema, new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
- schema).getFields());
-
- Assert.assertSame(null, new StormTuple<Object>(null, schema).getFields());
-
- }
-
- @Test
- public void testFieldIndex() throws Exception {
- Fields schema = new Fields("a1", "a2");
- StormTuple<Object> tuple = new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
- schema);
-
- Assert.assertEquals(0, tuple.fieldIndex("a1"));
- Assert.assertEquals(1, tuple.fieldIndex("a2"));
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testSelect() throws Exception {
- Tuple tuple = Tuple.getTupleClass(arity).newInstance();
- Values values = new Values();
-
- ArrayList<String> attributeNames = new ArrayList<String>(arity);
- ArrayList<String> filterNames = new ArrayList<String>(arity);
-
- for (int i = 0; i < arity; ++i) {
- tuple.setField(i, i);
- attributeNames.add("a" + i);
-
- if (r.nextBoolean()) {
- filterNames.add("a" + i);
- values.add(i);
- }
- }
- Fields schema = new Fields(attributeNames);
- Fields selector = new Fields(filterNames);
-
- Assert.assertEquals(values, new StormTuple(tuple, schema).select(selector));
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testGetValueByField() throws Exception {
- Object value = mock(Object.class);
- StormTuple tuple = testGetByField(arity, index, value);
- Assert.assertSame(value, tuple.getValueByField(fieldName));
-
- }
-
- @Test
- public void testGetValueByFieldPojo() throws Exception {
- Object value = mock(Object.class);
- TestPojoMember<Object> pojo = new TestPojoMember<Object>(value);
- StormTuple<TestPojoMember<Object>> tuple = new StormTuple<TestPojoMember<Object>>(pojo,
- null);
- Assert.assertSame(value, tuple.getValueByField(fieldNamePojo));
- }
-
- @Test
- public void testGetValueByFieldPojoGetter() throws Exception {
- Object value = mock(Object.class);
- TestPojoGetter<Object> pojo = new TestPojoGetter<Object>(value);
- StormTuple<TestPojoGetter<Object>> tuple = new StormTuple<TestPojoGetter<Object>>(pojo,
- null);
- Assert.assertSame(value, tuple.getValueByField(fieldNamePojo));
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testGetStringByField() throws Exception {
- String value = "stringValue";
- StormTuple tuple = testGetByField(arity, index, value);
- Assert.assertSame(value, tuple.getStringByField(fieldName));
- }
-
- @Test
- public void testGetStringByFieldPojo() throws Exception {
- String value = "stringValue";
- TestPojoMember<String> pojo = new TestPojoMember<String>(value);
- StormTuple<TestPojoMember<String>> tuple = new StormTuple<TestPojoMember<String>>(pojo,
- null);
- Assert.assertSame(value, tuple.getStringByField(fieldNamePojo));
- }
-
- @Test
- public void testGetStringByFieldPojoGetter() throws Exception {
- String value = "stringValue";
- TestPojoGetter<String> pojo = new TestPojoGetter<String>(value);
- StormTuple<TestPojoGetter<String>> tuple = new StormTuple<TestPojoGetter<String>>(pojo,
- null);
- Assert.assertSame(value, tuple.getStringByField(fieldNamePojo));
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testGetIntegerByField() throws Exception {
- Integer value = r.nextInt();
- StormTuple tuple = testGetByField(arity, index, value);
- Assert.assertSame(value, tuple.getIntegerByField(fieldName));
- }
-
- @Test
- public void testGetIntegerByFieldPojo() throws Exception {
- Integer value = r.nextInt();
- TestPojoMember<Integer> pojo = new TestPojoMember<Integer>(value);
- StormTuple<TestPojoMember<Integer>> tuple = new StormTuple<TestPojoMember<Integer>>(pojo,
- null);
- Assert.assertSame(value, tuple.getIntegerByField(fieldNamePojo));
- }
-
- @Test
- public void testGetIntegerByFieldPojoGetter() throws Exception {
- Integer value = r.nextInt();
- TestPojoGetter<Integer> pojo = new TestPojoGetter<Integer>(value);
- StormTuple<TestPojoGetter<Integer>> tuple = new StormTuple<TestPojoGetter<Integer>>(pojo,
- null);
- Assert.assertSame(value, tuple.getIntegerByField(fieldNamePojo));
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testGetLongByField() throws Exception {
- Long value = r.nextLong();
- StormTuple tuple = testGetByField(arity, index, value);
- Assert.assertSame(value, tuple.getLongByField(fieldName));
- }
-
- @Test
- public void testGetLongByFieldPojo() throws Exception {
- Long value = r.nextLong();
- TestPojoMember<Long> pojo = new TestPojoMember<Long>(value);
- StormTuple<TestPojoMember<Long>> tuple = new StormTuple<TestPojoMember<Long>>(pojo,
- null);
- Assert.assertSame(value, tuple.getLongByField(fieldNamePojo));
- }
-
- @Test
- public void testGetLongByFieldPojoGetter() throws Exception {
- Long value = r.nextLong();
- TestPojoGetter<Long> pojo = new TestPojoGetter<Long>(value);
- StormTuple<TestPojoGetter<Long>> tuple = new StormTuple<TestPojoGetter<Long>>(pojo,
- null);
- Assert.assertSame(value, tuple.getLongByField(fieldNamePojo));
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testGetBooleanByField() throws Exception {
- Boolean value = r.nextBoolean();
- StormTuple tuple = testGetByField(arity, index, value);
- Assert.assertEquals(value, tuple.getBooleanByField(fieldName));
- }
-
- @Test
- public void testGetBooleanByFieldPojo() throws Exception {
- Boolean value = r.nextBoolean();
- TestPojoMember<Boolean> pojo = new TestPojoMember<Boolean>(value);
- StormTuple<TestPojoMember<Boolean>> tuple = new StormTuple<TestPojoMember<Boolean>>(pojo,
- null);
- Assert.assertSame(value, tuple.getBooleanByField(fieldNamePojo));
- }
-
- @Test
- public void testGetBooleanByFieldPojoGetter() throws Exception {
- Boolean value = r.nextBoolean();
- TestPojoGetter<Boolean> pojo = new TestPojoGetter<Boolean>(value);
- StormTuple<TestPojoGetter<Boolean>> tuple = new StormTuple<TestPojoGetter<Boolean>>(pojo,
- null);
- Assert.assertSame(value, tuple.getBooleanByField(fieldNamePojo));
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testGetShortByField() throws Exception {
- Short value = (short) r.nextInt();
- StormTuple tuple = testGetByField(arity, index, value);
- Assert.assertSame(value, tuple.getShortByField(fieldName));
- }
-
- @Test
- public void testGetShortByFieldPojo() throws Exception {
- Short value = (short) r.nextInt();
- TestPojoMember<Short> pojo = new TestPojoMember<Short>(value);
- StormTuple<TestPojoMember<Short>> tuple = new StormTuple<TestPojoMember<Short>>(pojo,
- null);
- Assert.assertSame(value, tuple.getShortByField(fieldNamePojo));
- }
-
- @Test
- public void testGetShortByFieldPojoGetter() throws Exception {
- Short value = (short) r.nextInt();
- TestPojoGetter<Short> pojo = new TestPojoGetter<Short>(value);
- StormTuple<TestPojoGetter<Short>> tuple = new StormTuple<TestPojoGetter<Short>>(pojo,
- null);
- Assert.assertSame(value, tuple.getShortByField(fieldNamePojo));
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testGetByteByField() throws Exception {
- Byte value = new Byte((byte) r.nextInt());
- StormTuple tuple = testGetByField(arity, index, value);
- Assert.assertSame(value, tuple.getByteByField(fieldName));
- }
-
- @Test
- public void testGetByteByFieldPojo() throws Exception {
- Byte value = new Byte((byte) r.nextInt());
- TestPojoMember<Byte> pojo = new TestPojoMember<Byte>(value);
- StormTuple<TestPojoMember<Byte>> tuple = new StormTuple<TestPojoMember<Byte>>(pojo,
- null);
- Assert.assertSame(value, tuple.getByteByField(fieldNamePojo));
- }
-
- @Test
- public void testGetByteByFieldPojoGetter() throws Exception {
- Byte value = new Byte((byte) r.nextInt());
- TestPojoGetter<Byte> pojo = new TestPojoGetter<Byte>(value);
- StormTuple<TestPojoGetter<Byte>> tuple = new StormTuple<TestPojoGetter<Byte>>(pojo,
- null);
- Assert.assertSame(value, tuple.getByteByField(fieldNamePojo));
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testGetDoubleByField() throws Exception {
- Double value = r.nextDouble();
- StormTuple tuple = testGetByField(arity, index, value);
- Assert.assertSame(value, tuple.getDoubleByField(fieldName));
- }
-
- @Test
- public void testGetDoubleByFieldPojo() throws Exception {
- Double value = r.nextDouble();
- TestPojoMember<Double> pojo = new TestPojoMember<Double>(value);
- StormTuple<TestPojoMember<Double>> tuple = new StormTuple<TestPojoMember<Double>>(pojo,
- null);
- Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo));
- }
-
- @Test
- public void testGetDoubleByFieldPojoGetter() throws Exception {
- Double value = r.nextDouble();
- TestPojoGetter<Double> pojo = new TestPojoGetter<Double>(value);
- StormTuple<TestPojoGetter<Double>> tuple = new StormTuple<TestPojoGetter<Double>>(pojo,
- null);
- Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo));
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testGetFloatByField() throws Exception {
- Float value = r.nextFloat();
- StormTuple tuple = testGetByField(arity, index, value);
- Assert.assertSame(value, tuple.getFloatByField(fieldName));
- }
-
- @Test
- public void testGetFloatByFieldPojo() throws Exception {
- Float value = r.nextFloat();
- TestPojoMember<Float> pojo = new TestPojoMember<Float>(value);
- StormTuple<TestPojoMember<Float>> tuple = new StormTuple<TestPojoMember<Float>>(pojo,
- null);
- Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo));
- }
-
- @Test
- public void testGetFloatByFieldPojoGetter() throws Exception {
- Float value = r.nextFloat();
- TestPojoGetter<Float> pojo = new TestPojoGetter<Float>(value);
- StormTuple<TestPojoGetter<Float>> tuple = new StormTuple<TestPojoGetter<Float>>(pojo,
- null);
- Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo));
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testGetBinaryByField() throws Exception {
- byte[] data = new byte[1 + r.nextInt(20)];
- r.nextBytes(data);
- StormTuple tuple = testGetByField(arity, index, data);
- Assert.assertSame(data, tuple.getBinaryByField(fieldName));
- }
-
- @Test
- public void testGetBinaryFieldPojo() throws Exception {
- byte[] data = new byte[1 + r.nextInt(20)];
- r.nextBytes(data);
- TestPojoMember<byte[]> pojo = new TestPojoMember<byte[]>(data);
- StormTuple<TestPojoMember<byte[]>> tuple = new StormTuple<TestPojoMember<byte[]>>(pojo,
- null);
- Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo));
- }
-
- @Test
- public void testGetBinaryByFieldPojoGetter() throws Exception {
- byte[] data = new byte[1 + r.nextInt(20)];
- r.nextBytes(data);
- TestPojoGetter<byte[]> pojo = new TestPojoGetter<byte[]>(data);
- StormTuple<TestPojoGetter<byte[]>> tuple = new StormTuple<TestPojoGetter<byte[]>>(pojo,
- null);
- Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo));
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private <T> StormTuple testGetByField(int arity, int index, T value)
- throws Exception {
-
- assert (index < arity);
-
- Tuple tuple = Tuple.getTupleClass(arity).newInstance();
- tuple.setField(value, index);
-
- ArrayList<String> attributeNames = new ArrayList<String>(arity);
- for(int i = 0; i < arity; ++i) {
- if(i == index) {
- attributeNames.add(fieldName);
- } else {
- attributeNames.add("" + i);
- }
- }
- Fields schema = new Fields(attributeNames);
-
- return new StormTuple(tuple, schema);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testGetSourceGlobalStreamid() {
- new StormTuple<Object>(null, null).getSourceGlobalStreamid();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testGetSourceComponent() {
- new StormTuple<Object>(null, null).getSourceComponent();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testGetSourceTask() {
- new StormTuple<Object>(null, null).getSourceTask();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testGetSourceStreamId() {
- new StormTuple<Object>(null, null).getSourceStreamId();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testGetMessageId() {
- new StormTuple<Object>(null, null).getMessageId();
- }
-
- public static class TestPojoMember<T> {
- public T member;
-
- public TestPojoMember(T value) {
- this.member = value;
- }
- }
-
- public static class TestPojoGetter<T> {
- private T member;
-
- public TestPojoGetter(T value) {
- this.member = value;
- }
-
- public T getMember() {
- return this.member;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
deleted file mode 100644
index c799d63..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.stormcompatibility.api.TestTopologyBuilder;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.apache.flink.stormcompatibility.util.TestDummyBolt;
-import org.apache.flink.stormcompatibility.util.TestDummySpout;
-import org.apache.flink.stormcompatibility.util.TestSink;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import com.google.common.collect.Sets;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@PowerMockIgnore("javax.*")
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(StormWrapperSetupHelper.class)
-public class StormWrapperSetupHelperTest extends AbstractTest {
-
- @Test
- public void testEmptyDeclarerBolt() {
- IComponent boltOrSpout;
-
- if (this.r.nextBoolean()) {
- boltOrSpout = mock(IRichSpout.class);
- } else {
- boltOrSpout = mock(IRichBolt.class);
- }
-
- Assert.assertEquals(new HashMap<String, Integer>(),
- StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, null));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testRawType() throws Exception {
- IComponent boltOrSpout;
-
- if (this.r.nextBoolean()) {
- boltOrSpout = mock(IRichSpout.class);
- } else {
- boltOrSpout = mock(IRichBolt.class);
- }
-
- final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
- declarer.declare(new Fields("dummy1", "dummy2"));
- PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
- StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
- Sets.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testToManyAttributes() throws Exception {
- IComponent boltOrSpout;
-
- if (this.r.nextBoolean()) {
- boltOrSpout = mock(IRichSpout.class);
- } else {
- boltOrSpout = mock(IRichBolt.class);
- }
-
- final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
- final String[] schema = new String[26];
- for (int i = 0; i < schema.length; ++i) {
- schema[i] = "a" + i;
- }
- declarer.declare(new Fields(schema));
- PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
- StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, null);
- }
-
- @Test
- public void testTupleTypes() throws Exception {
- for (int i = -1; i < 26; ++i) {
- this.testTupleTypes(i);
- }
- }
-
- private void testTupleTypes(final int numberOfAttributes) throws Exception {
- String[] schema;
- if (numberOfAttributes == -1) {
- schema = new String[1];
- } else {
- schema = new String[numberOfAttributes];
- }
- for (int i = 0; i < schema.length; ++i) {
- schema[i] = "a" + i;
- }
-
- IComponent boltOrSpout;
- if (this.r.nextBoolean()) {
- boltOrSpout = mock(IRichSpout.class);
- } else {
- boltOrSpout = mock(IRichBolt.class);
- }
-
- final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
- declarer.declare(new Fields(schema));
- PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
- HashMap<String, Integer> attributes = new HashMap<String, Integer>();
- attributes.put(Utils.DEFAULT_STREAM_ID, numberOfAttributes);
-
- Assert.assertEquals(attributes, StormWrapperSetupHelper.getNumberOfAttributes(
- boltOrSpout,
- numberOfAttributes == -1 ? Sets
- .newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }) : null));
- }
-
- @Test
- public void testCreateTopologyContext() {
- HashMap<String, Integer> dops = new HashMap<String, Integer>();
- dops.put("spout1", 1);
- dops.put("spout2", 3);
- dops.put("bolt1", 1);
- dops.put("bolt2", 2);
- dops.put("sink", 1);
-
- HashMap<String, Integer> taskCounter = new HashMap<String, Integer>();
- taskCounter.put("spout1", 0);
- taskCounter.put("spout2", 0);
- taskCounter.put("bolt1", 0);
- taskCounter.put("bolt2", 0);
- taskCounter.put("sink", 0);
-
- HashMap<String, IComponent> operators = new HashMap<String, IComponent>();
- operators.put("spout1", new TestDummySpout());
- operators.put("spout2", new TestDummySpout());
- operators.put("bolt1", new TestDummyBolt());
- operators.put("bolt2", new TestDummyBolt());
- operators.put("sink", new TestSink());
-
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1"));
- builder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2"));
- builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
- builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
- builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
- .fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
- .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
- .fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
- .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
-
- final int maxRetry = 3;
- int counter;
- for (counter = 0; counter < maxRetry; ++counter) {
- LocalCluster cluster = new LocalCluster();
- Config c = new Config();
- c.setNumAckers(0);
- cluster.submitTopology("test", c, builder.createTopology());
- Utils.sleep((counter + 1) * 5000);
- cluster.shutdown();
-
- if (TestSink.result.size() == 8) {
- break;
- }
- }
- Assert.assertTrue(counter < maxRetry);
-
- TestTopologyBuilder flinkBuilder = new TestTopologyBuilder();
-
- flinkBuilder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1"));
- flinkBuilder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2"));
- flinkBuilder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
- flinkBuilder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
- flinkBuilder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
- .fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
- .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
- .fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
- .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
-
- flinkBuilder.createTopology();
- StormTopology stormTopology = flinkBuilder.getStormTopology();
-
- Set<Integer> taskIds = new HashSet<Integer>();
-
- for (TopologyContext expectedContext : TestSink.result) {
- final String thisComponentId = expectedContext.getThisComponentId();
- int index = taskCounter.get(thisComponentId);
-
- StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
- when(context.getTaskName()).thenReturn(thisComponentId);
- when(context.getNumberOfParallelSubtasks()).thenReturn(dops.get(thisComponentId));
- when(context.getIndexOfThisSubtask()).thenReturn(index);
- taskCounter.put(thisComponentId, ++index);
-
- Config stormConfig = new Config();
- stormConfig.put(StormWrapperSetupHelper.TOPOLOGY_NAME, "test");
-
- TopologyContext topologyContext = StormWrapperSetupHelper.createTopologyContext(
- context, operators.get(thisComponentId), stormTopology, stormConfig);
-
- ComponentCommon expcetedCommon = expectedContext.getComponentCommon(thisComponentId);
- ComponentCommon common = topologyContext.getComponentCommon(thisComponentId);
-
- Assert.assertNull(topologyContext.getCodeDir());
- Assert.assertNull(common.get_json_conf());
- Assert.assertNull(topologyContext.getExecutorData(null));
- Assert.assertNull(topologyContext.getPIDDir());
- Assert.assertNull(topologyContext.getResource(null));
- Assert.assertNull(topologyContext.getSharedExecutor());
- Assert.assertNull(expectedContext.getTaskData(null));
- Assert.assertNull(topologyContext.getThisWorkerPort());
-
- Assert.assertTrue(expectedContext.getStormId().startsWith(topologyContext.getStormId()));
-
- Assert.assertEquals(expcetedCommon.get_inputs(), common.get_inputs());
- Assert.assertEquals(expcetedCommon.get_parallelism_hint(), common.get_parallelism_hint());
- Assert.assertEquals(expcetedCommon.get_streams(), common.get_streams());
- Assert.assertEquals(expectedContext.getComponentIds(), topologyContext.getComponentIds());
- Assert.assertEquals(expectedContext.getComponentStreams(thisComponentId),
- topologyContext.getComponentStreams(thisComponentId));
- Assert.assertEquals(thisComponentId, topologyContext.getThisComponentId());
- Assert.assertEquals(expectedContext.getThisSources(), topologyContext.getThisSources());
- Assert.assertEquals(expectedContext.getThisStreams(), topologyContext.getThisStreams());
- Assert.assertEquals(expectedContext.getThisTargets(), topologyContext.getThisTargets());
- Assert.assertEquals(0, topologyContext.getThisWorkerTasks().size());
-
- for (int taskId : topologyContext.getComponentTasks(thisComponentId)) {
- Assert.assertEquals(thisComponentId, topologyContext.getComponentId(taskId));
- }
-
- for (String componentId : expectedContext.getComponentIds()) {
- Assert.assertEquals(expectedContext.getSources(componentId),
- topologyContext.getSources(componentId));
- Assert.assertEquals(expectedContext.getTargets(componentId),
- topologyContext.getTargets(componentId));
-
- for (String streamId : expectedContext.getComponentStreams(componentId)) {
- Assert.assertEquals(
- expectedContext.getComponentOutputFields(componentId, streamId).toList(),
- topologyContext.getComponentOutputFields(componentId, streamId).toList());
- }
- }
-
- for (String streamId : expectedContext.getThisStreams()) {
- Assert.assertEquals(expectedContext.getThisOutputFields(streamId).toList(),
- topologyContext.getThisOutputFields(streamId).toList());
- }
-
- HashMap<Integer, String> taskToComponents = new HashMap<Integer, String>();
- Set<Integer> allTaskIds = new HashSet<Integer>();
- for (String componentId : expectedContext.getComponentIds()) {
- List<Integer> possibleTasks = expectedContext.getComponentTasks(componentId);
- List<Integer> tasks = topologyContext.getComponentTasks(componentId);
-
- Iterator<Integer> p_it = possibleTasks.iterator();
- Iterator<Integer> t_it = tasks.iterator();
- while(p_it.hasNext()) {
- Assert.assertTrue(t_it.hasNext());
- Assert.assertNull(taskToComponents.put(p_it.next(), componentId));
- Assert.assertTrue(allTaskIds.add(t_it.next()));
- }
- Assert.assertFalse(t_it.hasNext());
- }
-
- Assert.assertEquals(taskToComponents, expectedContext.getTaskToComponent());
- Assert.assertTrue(taskIds.add(topologyContext.getThisTaskId()));
-
- try {
- topologyContext.getHooks();
- Assert.fail();
- } catch (UnsupportedOperationException e) { /* expected */ }
-
- try {
- topologyContext.getRegisteredMetricByName(null);
- Assert.fail();
- } catch (UnsupportedOperationException e) { /* expected */ }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
deleted file mode 100644
index 7c91e6f..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-import java.util.LinkedList;
-
-class TestContext implements SourceContext<Tuple1<Integer>> {
- public LinkedList<Tuple1<Integer>> result = new LinkedList<Tuple1<Integer>>();
-
- public TestContext() {
- }
-
- @Override
- public void collect(final Tuple1<Integer> record) {
- this.result.add(record.copy());
- }
-
- @Override
- public void collectWithTimestamp(Tuple1<Integer> element, long timestamp) {
- this.result.add(element.copy());
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- // ignore it
- }
-
- @Override
- public Object getCheckpointLock() {
- return null;
- }
-
- @Override
- public void close() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j-test.properties b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j-test.properties
deleted file mode 100644
index 0b686e5..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j.properties b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j.properties
deleted file mode 100644
index ed2bbcb..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=OFF, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target = System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/logback-test.xml b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/logback-test.xml
deleted file mode 100644
index 4f56748..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="WARN">
- <appender-ref ref="STDOUT"/>
- </root>
- <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
-</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
deleted file mode 100644
index 6290df2..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
+++ /dev/null
@@ -1,20 +0,0 @@
-# flink-storm-examples
-
-This module contains multiple versions of a simple word-count-example to illustrate the usage of the compatibility layer:
-* the usage of spouts or bolt within a regular Flink streaming program (ie, embedded spouts or bolts)
- 1. `SpoutSourceWordCount` uses a spout as data source within a Flink streaming program
- 2. `BoltTokenizeerWordCount` uses a bolt to split sentences into words within a Flink streaming program
- * `BoltTokenizeerWordCountWithNames` used Tuple input type and access attributes by field names (rather than index)
- * `BoltTokenizeerWordCountPOJO` used POJO input type and access attributes by field names (rather then index)
-
-* how to submit a whole Storm topology to Flink
- 3. `WordCountTopology` plugs a Storm topology together
- * `StormWordCountLocal` submits the topology to a local Flink cluster (similiar to a `LocalCluster` in Storm)
- (`StormWordCountNamedLocal` access attributes by field names rather than index)
- * `StormWordCountRemoteByClient` submits the topology to a remote Flink cluster (simliar to the usage of `NimbusClient` in Storm)
- * `StormWordCountRemoteBySubmitter` submits the topology to a remote Flink cluster (simliar to the usage of `StormSubmitter` in Storm)
-
-Additionally, this module package the three examples word-count programs as jar files to be submitted to a Flink cluster via `bin/flink run example.jar`.
-(Valid jars are `WordCount-SpoutSource.jar`, `WordCount-BoltTokenizer.jar`, and `WordCount-StormTopology.jar`)
-
-The package `org.apache.flink.stormcompatiblitly.wordcount.stormoperators` contain original Storm spouts and bolts that can be used unmodified within Storm or Flink.