You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:39 UTC
[14/15] flink git commit: [FLINK-2566] FlinkTopologyContext not
populated completely - extended FlinkTopologyContext to be populted with all
supportable attributes - added JUnit test - updated README.md additionally:
module restructuring to get cle
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java
new file mode 100644
index 0000000..1891873
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+public class FiniteTestSpout implements IRichSpout {
+ private static final long serialVersionUID = 7992419478267824279L;
+
+ private int numberOfOutputTuples;
+ private SpoutOutputCollector collector;
+
+ public FiniteTestSpout(final int numberOfOutputTuples) {
+ this.numberOfOutputTuples = numberOfOutputTuples;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void close() {/* nothing to do */}
+
+ @Override
+ public void activate() {/* nothing to do */}
+
+ @Override
+ public void deactivate() {/* nothing to do */}
+
+ @Override
+ public void nextTuple() {
+ if (--this.numberOfOutputTuples >= 0) {
+ this.collector.emit(new Values(new Integer(this.numberOfOutputTuples)));
+ }
+ }
+
+ @Override
+ public void ack(final Object msgId) {/* nothing to do */}
+
+ @Override
+ public void fail(final Object msgId) {/* nothing to do */}
+
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("dummy"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java
new file mode 100644
index 0000000..8e63563
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.stormcompatibility.util.FlinkOutputFieldsDeclarer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.LinkedList;
+
+public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
+
+
+
+ @Test
+ public void testNull() {
+ Assert.assertNull(new FlinkOutputFieldsDeclarer().getOutputType(null));
+ }
+
+ @Test
+ public void testDeclare() {
+ for (int i = 0; i < 2; ++i) { // test case: simple / non-direct
+ for (int j = 1; j < 2; ++j) { // number of streams
+ for (int k = 0; k <= 25; ++k) { // number of attributes
+ this.runDeclareTest(i, j, k);
+ }
+ }
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDeclareSimpleToManyAttributes() {
+ this.runDeclareTest(0, this.r.nextBoolean() ? 1 : 2, 26);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDeclareNonDirectToManyAttributes() {
+ this.runDeclareTest(1, this.r.nextBoolean() ? 1 : 2, 26);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDeclareDefaultStreamToManyAttributes() {
+ this.runDeclareTest(2, this.r.nextBoolean() ? 1 : 2, 26);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDeclareFullToManyAttributes() {
+ this.runDeclareTest(3, this.r.nextBoolean() ? 1 : 2, 26);
+ }
+
+ private void runDeclareTest(final int testCase, final int numberOfStreams,
+ final int numberOfAttributes) {
+ final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+
+ String[] streams = null;
+ if (numberOfStreams > 1 || r.nextBoolean()) {
+ streams = new String[numberOfStreams];
+ for (int i = 0; i < numberOfStreams; ++i) {
+ streams[i] = "stream" + i;
+ }
+ }
+
+ final String[] attributes = new String[numberOfAttributes];
+ for (int i = 0; i < attributes.length; ++i) {
+ attributes[i] = "a" + i;
+ }
+
+ switch (testCase) {
+ case 0:
+ this.declareSimple(declarer, streams, attributes);
+ break;
+ default:
+ this.declareNonDirect(declarer, streams, attributes);
+ }
+
+ if (streams == null) {
+ streams = new String[] { Utils.DEFAULT_STREAM_ID };
+ }
+
+ for (String stream : streams) {
+ final TypeInformation<?> type = declarer.getOutputType(stream);
+
+ if (numberOfAttributes == 1) {
+ Assert.assertEquals(type.getClass(), GenericTypeInfo.class);
+ Assert.assertEquals(type.getTypeClass(), Object.class);
+ } else {
+ Assert.assertEquals(numberOfAttributes, type.getArity());
+ Assert.assertTrue(type.isTupleType());
+ }
+ }
+ }
+
+ private void declareSimple(final FlinkOutputFieldsDeclarer declarer, final String[] streams,
+ final String[] attributes) {
+
+ if (streams != null) {
+ for (String stream : streams) {
+ declarer.declareStream(stream, new Fields(attributes));
+ }
+ } else {
+ declarer.declare(new Fields(attributes));
+ }
+ }
+
+ private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, final String[] streams,
+ final String[] attributes) {
+
+ if (streams != null) {
+ for (String stream : streams) {
+ declarer.declareStream(stream, false, new Fields(attributes));
+ }
+ } else {
+ declarer.declare(false, new Fields(attributes));
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testUndeclared() {
+ final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+ declarer.getOutputType("unknownStreamId");
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeclareDirect() {
+ new FlinkOutputFieldsDeclarer().declare(true, null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeclareDirect2() {
+ new FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
+ }
+
+ @Test
+ public void testGetGroupingFieldIndexes() {
+ final int numberOfAttributes = 5 + this.r.nextInt(21);
+ final String[] attributes = new String[numberOfAttributes];
+ for (int i = 0; i < numberOfAttributes; ++i) {
+ attributes[i] = "a" + i;
+ }
+
+ final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+ declarer.declare(new Fields(attributes));
+
+ final int numberOfKeys = 1 + this.r.nextInt(25);
+ final LinkedList<String> groupingFields = new LinkedList<String>();
+ final boolean[] indexes = new boolean[numberOfAttributes];
+
+ for (int i = 0; i < numberOfAttributes; ++i) {
+ if (this.r.nextInt(26) < numberOfKeys) {
+ groupingFields.add(attributes[i]);
+ indexes[i] = true;
+ } else {
+ indexes[i] = false;
+ }
+ }
+
+ final int[] expectedResult = new int[groupingFields.size()];
+ int j = 0;
+ for (int i = 0; i < numberOfAttributes; ++i) {
+ if (indexes[i]) {
+ expectedResult[j++] = i;
+ }
+ }
+
+ final int[] result = declarer.getGroupingFieldIndexes(Utils.DEFAULT_STREAM_ID,
+ groupingFields);
+
+ Assert.assertEquals(expectedResult.length, result.length);
+ for (int i = 0; i < expectedResult.length; ++i) {
+ Assert.assertEquals(expectedResult[i], result[i]);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java
new file mode 100644
index 0000000..c3cb7d7
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.Iterator;
+
+import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FlinkStormStreamSelectorTest {
+
+ @Test
+ public void testSelector() {
+ FlinkStormStreamSelector<Object> selector = new FlinkStormStreamSelector<Object>();
+ SplitStreamType<Object> tuple = new SplitStreamType<Object>();
+ Iterator<String> result;
+
+ tuple.streamId = "stream1";
+ result = selector.select(tuple).iterator();
+ Assert.assertEquals("stream1", result.next());
+ Assert.assertFalse(result.hasNext());
+
+ tuple.streamId = "stream2";
+ result = selector.select(tuple).iterator();
+ Assert.assertEquals("stream2", result.next());
+ Assert.assertFalse(result.hasNext());
+
+ tuple.streamId = "stream1";
+ result = selector.select(tuple).iterator();
+ Assert.assertEquals("stream1", result.next());
+ Assert.assertFalse(result.hasNext());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java
new file mode 100644
index 0000000..bd9ea3f
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.HashMap;
+
+import backtype.storm.generated.Bolt;
+import backtype.storm.generated.SpoutSpec;
+import backtype.storm.generated.StateSpoutSpec;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+
+import org.apache.flink.stormcompatibility.util.FlinkTopologyContext;
+import org.junit.Test;
+
+
+/*
+ * FlinkTopologyContext.getSources(componentId) and FlinkTopologyContext.getTargets(componentId) are not tested here,
+ * because those are tested in StormWrapperSetupHelperTest.
+ */
+public class FlinkTopologyContextTest extends AbstractTest {
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAddTaskHook() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .addTaskHook(null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGetHooks() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .getHooks();
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test(expected = UnsupportedOperationException.class)
+ public void testRegisteredMetric1() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .registerMetric(null, (ICombiner) null, 0);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test(expected = UnsupportedOperationException.class)
+ public void testRegisteredMetric2() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .registerMetric(null, (IReducer) null, 0);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testRegisteredMetric3() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .registerMetric(null, (IMetric) null, 0);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGetRegisteredMetricByName() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .getRegisteredMetricByName(null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSetAllSubscribedState() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .setAllSubscribedState(null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSetSubscribedState1() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .setSubscribedState(null, null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSetSubscribedState2() {
+ new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
+ new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
+ null, null, null, null, null, null, null, null, null, null, null, null, null)
+ .setSubscribedState(null, null, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
index ec48719..b499373 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
@@ -24,6 +24,7 @@ import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
public class TestDummyBolt implements IRichBolt {
private static final long serialVersionUID = 6893611247443121322L;
@@ -31,12 +32,27 @@ public class TestDummyBolt implements IRichBolt {
public final static String shuffleStreamId = "shuffleStream";
public final static String groupingStreamId = "groupingStream";
+ private boolean emit = true;
+ private TopologyContext context;
+ private OutputCollector collector;
+
@SuppressWarnings("rawtypes")
@Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.context = context;
+ this.collector = collector;
+ }
@Override
- public void execute(Tuple input) {}
+ public void execute(Tuple input) {
+ if (this.context.getThisTaskIndex() == 0) {
+ this.collector.emit(shuffleStreamId, input.getValues());
+ }
+ if (this.emit) {
+ this.collector.emit(groupingStreamId, new Values("bolt", this.context));
+ this.emit = false;
+ }
+ }
@Override
public void cleanup() {}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
index 62705b8..345ca12 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
@@ -23,6 +23,7 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class TestDummySpout implements IRichSpout {
@@ -30,9 +31,16 @@ public class TestDummySpout implements IRichSpout {
public final static String spoutStreamId = "spout-stream";
+ private boolean emit = true;
+ private TopologyContext context;
+ private SpoutOutputCollector collector;
+
@SuppressWarnings("rawtypes")
@Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.context = context;
+ this.collector = collector;
+ }
@Override
public void close() {}
@@ -44,7 +52,12 @@ public class TestDummySpout implements IRichSpout {
public void deactivate() {}
@Override
- public void nextTuple() {}
+ public void nextTuple() {
+ if (this.emit) {
+ this.collector.emit(new Values(this.context));
+ this.emit = false;
+ }
+ }
@Override
public void ack(Object msgId) {}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
index 5699219..c8e5584 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
@@ -16,6 +16,8 @@
*/
package org.apache.flink.stormcompatibility.util;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
@@ -27,12 +29,22 @@ import backtype.storm.tuple.Tuple;
public class TestSink implements IRichBolt {
private static final long serialVersionUID = 4314871456719370877L;
+ public final static List<TopologyContext> result = new LinkedList<TopologyContext>();
+
@SuppressWarnings("rawtypes")
@Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ result.add(context);
+ }
@Override
- public void execute(Tuple input) {}
+ public void execute(Tuple input) {
+ if (input.size() == 1) {
+ result.add((TopologyContext) input.getValue(0));
+ } else {
+ result.add((TopologyContext) input.getValue(1));
+ }
+ }
@Override
public void cleanup() {}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
index 381e130..b44e8a1 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.stormcompatibility.wrappers;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.junit.Test;
@@ -43,6 +45,8 @@ public class FiniteStormSpoutWrapperTest {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
final FiniteStormSpoutWrapper<?> wrapper = new FiniteStormSpoutWrapper<Object>(stormSpout);
wrapper.setRuntimeContext(taskContext);
@@ -59,6 +63,8 @@ public class FiniteStormSpoutWrapperTest {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
final FiniteStormSpoutWrapper<?> wrapper = new FiniteStormSpoutWrapper<Object>(stormSpout);
wrapper.setRuntimeContext(taskContext);
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
deleted file mode 100644
index eef35cf..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-import java.util.Map;
-
-class FiniteTestSpout implements IRichSpout {
- private static final long serialVersionUID = 7992419478267824279L;
-
- private int numberOfOutputTuples;
- private SpoutOutputCollector collector;
-
- public FiniteTestSpout(final int numberOfOutputTuples) {
- this.numberOfOutputTuples = numberOfOutputTuples;
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void close() {/* nothing to do */}
-
- @Override
- public void activate() {/* nothing to do */}
-
- @Override
- public void deactivate() {/* nothing to do */}
-
- @Override
- public void nextTuple() {
- if (--this.numberOfOutputTuples >= 0) {
- this.collector.emit(new Values(new Integer(this.numberOfOutputTuples)));
- }
- }
-
- @Override
- public void ack(final Object msgId) {/* nothing to do */}
-
- @Override
- public void fail(final Object msgId) {/* nothing to do */}
-
- @Override
- public void declareOutputFields(final OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("dummy"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
deleted file mode 100644
index c0a6ed3..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Iterator;
-
-import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class FlinkStormStreamSelectorTest {
-
- @Test
- public void testSelector() {
- FlinkStormStreamSelector<Object> selector = new FlinkStormStreamSelector<Object>();
- SplitStreamType<Object> tuple = new SplitStreamType<Object>();
- Iterator<String> result;
-
- tuple.streamId = "stream1";
- result = selector.select(tuple).iterator();
- Assert.assertEquals("stream1", result.next());
- Assert.assertFalse(result.hasNext());
-
- tuple.streamId = "stream2";
- result = selector.select(tuple).iterator();
- Assert.assertEquals("stream2", result.next());
- Assert.assertFalse(result.hasNext());
-
- tuple.streamId = "stream1";
- result = selector.select(tuple).iterator();
- Assert.assertEquals("stream1", result.next());
- Assert.assertFalse(result.hasNext());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java
new file mode 100644
index 0000000..738eb1e
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+public class SetupOutputFieldsDeclarerTest extends AbstractTest {
+
+ @Test
+ public void testDeclare() {
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
+
+ int numberOfAttributes = this.r.nextInt(26);
+ declarer.declare(createSchema(numberOfAttributes));
+ Assert.assertEquals(1, declarer.outputSchemas.size());
+ Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(Utils.DEFAULT_STREAM_ID)
+ .intValue());
+
+ final String sid = "streamId";
+ numberOfAttributes = this.r.nextInt(26);
+ declarer.declareStream(sid, createSchema(numberOfAttributes));
+ Assert.assertEquals(2, declarer.outputSchemas.size());
+ Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(sid).intValue());
+ }
+
+ private Fields createSchema(final int numberOfAttributes) {
+ final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes);
+ for (int i = 0; i < numberOfAttributes; ++i) {
+ schema.add("a" + i);
+ }
+ return new Fields(schema);
+ }
+
+ @Test
+ public void testDeclareDirect() {
+ new SetupOutputFieldsDeclarer().declare(false, new Fields());
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeclareDirectFail() {
+ new SetupOutputFieldsDeclarer().declare(true, new Fields());
+ }
+
+ @Test
+ public void testDeclareStream() {
+ new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, new Fields());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDeclareStreamFail() {
+ new SetupOutputFieldsDeclarer().declareStream(null, new Fields());
+ }
+
+ @Test
+ public void testDeclareFullStream() {
+ new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDeclareFullStreamFailNonDefaultStream() {
+ new SetupOutputFieldsDeclarer().declareStream(null, false, new Fields());
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeclareFullStreamFailDirect() {
+ new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, new Fields());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
index 5cfb151..6817593 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -61,9 +61,9 @@ public class StormBoltWrapperTest extends AbstractTest {
@Test(expected = IllegalArgumentException.class)
public void testWrapperRawType() throws Exception {
- final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
declarer.declare(new Fields("dummy1", "dummy2"));
- PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
new StormBoltWrapper<Object, Object>(mock(IRichBolt.class),
new String[] { Utils.DEFAULT_STREAM_ID });
@@ -71,26 +71,26 @@ public class StormBoltWrapperTest extends AbstractTest {
@Test(expected = IllegalArgumentException.class)
public void testWrapperToManyAttributes1() throws Exception {
- final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
final String[] schema = new String[26];
for (int i = 0; i < schema.length; ++i) {
schema[i] = "a" + i;
}
declarer.declare(new Fields(schema));
- PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
new StormBoltWrapper<Object, Object>(mock(IRichBolt.class));
}
@Test(expected = IllegalArgumentException.class)
public void testWrapperToManyAttributes2() throws Exception {
- final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
final String[] schema = new String[26];
for (int i = 0; i < schema.length; ++i) {
schema[i] = "a" + i;
}
declarer.declare(new Fields(schema));
- PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), new String[] {});
}
@@ -133,12 +133,14 @@ public class StormBoltWrapperTest extends AbstractTest {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
final IRichBolt bolt = mock(IRichBolt.class);
- final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
declarer.declare(new Fields(schema));
- PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, (Fields) null);
wrapper.setup(mock(Output.class), taskContext);
@@ -163,6 +165,8 @@ public class StormBoltWrapperTest extends AbstractTest {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
final Output output = mock(Output.class);
@@ -209,14 +213,17 @@ public class StormBoltWrapperTest extends AbstractTest {
final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
- .thenReturn(flinkConfig);
+ .thenReturn(flinkConfig);
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
- final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
declarer.declare(new Fields("dummy"));
- PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
final IRichBolt bolt = mock(IRichBolt.class);
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
@@ -249,8 +256,11 @@ public class StormBoltWrapperTest extends AbstractTest {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
final IRichBolt bolt = mock(IRichBolt.class);
+
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
wrapper.setup(mock(Output.class), taskContext);
@@ -275,9 +285,9 @@ public class StormBoltWrapperTest extends AbstractTest {
public void testClose() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
- final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
declarer.declare(new Fields("dummy"));
- PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
index a4eea7e..77f1b05 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
@@ -22,7 +22,9 @@ import backtype.storm.tuple.Fields;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.stormcompatibility.util.FiniteTestSpout;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.junit.Assert;
@@ -46,12 +48,14 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
@SuppressWarnings("unchecked")
@Test
public void testRunExecuteFixedNumber() throws Exception {
- final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
declarer.declare(new Fields("dummy"));
- PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
final IRichSpout spout = mock(IRichSpout.class);
final int numberOfCalls = this.r.nextInt(50);
@@ -73,6 +77,8 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
@@ -94,11 +100,12 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest {
StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
spout);
- spoutWrapper.setRuntimeContext(taskContext);
spoutWrapper.cancel();
final TestContext collector = new TestContext();
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
deleted file mode 100644
index 561939f..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-
-public class StormOutputFieldsDeclarerTest extends AbstractTest {
-
- @Test
- public void testDeclare() {
- final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
-
- int numberOfAttributes = this.r.nextInt(26);
- declarer.declare(createSchema(numberOfAttributes));
- Assert.assertEquals(1, declarer.outputSchemas.size());
- Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(Utils.DEFAULT_STREAM_ID)
- .intValue());
-
- final String sid = "streamId";
- numberOfAttributes = this.r.nextInt(26);
- declarer.declareStream(sid, createSchema(numberOfAttributes));
- Assert.assertEquals(2, declarer.outputSchemas.size());
- Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(sid).intValue());
- }
-
- private Fields createSchema(final int numberOfAttributes) {
- final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes);
- for (int i = 0; i < numberOfAttributes; ++i) {
- schema.add("a" + i);
- }
- return new Fields(schema);
- }
-
- @Test
- public void testDeclareDirect() {
- new StormOutputFieldsDeclarer().declare(false, new Fields());
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testDeclareDirectFail() {
- new StormOutputFieldsDeclarer().declare(true, new Fields());
- }
-
- @Test
- public void testDeclareStream() {
- new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, new Fields());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDeclareStreamFail() {
- new StormOutputFieldsDeclarer().declareStream(null, new Fields());
- }
-
- @Test
- public void testDeclareFullStream() {
- new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDeclareFullStreamFailNonDefaultStream() {
- new StormOutputFieldsDeclarer().declareStream(null, false, new Fields());
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testDeclareFullStreamFailDirect() {
- new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, new Fields());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
index 04dc48d..f4fb4da 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
@@ -25,9 +25,11 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.stormcompatibility.util.FiniteTestSpout;
import org.apache.flink.stormcompatibility.util.StormConfig;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -89,8 +91,12 @@ public class StormSpoutWrapperTest extends AbstractTest {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
+ when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
+ when(taskContext.getTaskName()).thenReturn("name");
final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
+
+
final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
spoutWrapper.setRuntimeContext(taskContext);
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
index 7497ffc..c799d63 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
@@ -14,29 +14,46 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.flink.stormcompatibility.wrappers;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IComponent;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
+import org.apache.flink.stormcompatibility.api.TestTopologyBuilder;
import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.stormcompatibility.util.TestDummyBolt;
+import org.apache.flink.stormcompatibility.util.TestDummySpout;
+import org.apache.flink.stormcompatibility.util.TestSink;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.google.common.collect.Sets;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+@PowerMockIgnore("javax.*")
@RunWith(PowerMockRunner.class)
@PrepareForTest(StormWrapperSetupHelper.class)
public class StormWrapperSetupHelperTest extends AbstractTest {
@@ -65,9 +82,9 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
boltOrSpout = mock(IRichBolt.class);
}
- final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
declarer.declare(new Fields("dummy1", "dummy2"));
- PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
Sets.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }));
@@ -83,13 +100,13 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
boltOrSpout = mock(IRichBolt.class);
}
- final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
final String[] schema = new String[26];
for (int i = 0; i < schema.length; ++i) {
schema[i] = "a" + i;
}
declarer.declare(new Fields(schema));
- PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, null);
}
@@ -119,9 +136,9 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
boltOrSpout = mock(IRichBolt.class);
}
- final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+ final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
declarer.declare(new Fields(schema));
- PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+ PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
HashMap<String, Integer> attributes = new HashMap<String, Integer>();
attributes.put(Utils.DEFAULT_STREAM_ID, numberOfAttributes);
@@ -132,4 +149,167 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }) : null));
}
+ @Test
+ public void testCreateTopologyContext() {
+ HashMap<String, Integer> dops = new HashMap<String, Integer>();
+ dops.put("spout1", 1);
+ dops.put("spout2", 3);
+ dops.put("bolt1", 1);
+ dops.put("bolt2", 2);
+ dops.put("sink", 1);
+
+ HashMap<String, Integer> taskCounter = new HashMap<String, Integer>();
+ taskCounter.put("spout1", 0);
+ taskCounter.put("spout2", 0);
+ taskCounter.put("bolt1", 0);
+ taskCounter.put("bolt2", 0);
+ taskCounter.put("sink", 0);
+
+ HashMap<String, IComponent> operators = new HashMap<String, IComponent>();
+ operators.put("spout1", new TestDummySpout());
+ operators.put("spout2", new TestDummySpout());
+ operators.put("bolt1", new TestDummyBolt());
+ operators.put("bolt2", new TestDummyBolt());
+ operators.put("sink", new TestSink());
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1"));
+ builder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2"));
+ builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
+ builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
+ builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
+ .fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
+ .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
+ .fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
+ .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
+
+ final int maxRetry = 3;
+ int counter;
+ for (counter = 0; counter < maxRetry; ++counter) {
+ LocalCluster cluster = new LocalCluster();
+ Config c = new Config();
+ c.setNumAckers(0);
+ cluster.submitTopology("test", c, builder.createTopology());
+ Utils.sleep((counter + 1) * 5000);
+ cluster.shutdown();
+
+ if (TestSink.result.size() == 8) {
+ break;
+ }
+ }
+ Assert.assertTrue(counter < maxRetry);
+
+ TestTopologyBuilder flinkBuilder = new TestTopologyBuilder();
+
+ flinkBuilder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1"));
+ flinkBuilder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2"));
+ flinkBuilder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
+ flinkBuilder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
+ flinkBuilder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
+ .fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
+ .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
+ .fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
+ .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
+
+ flinkBuilder.createTopology();
+ StormTopology stormTopology = flinkBuilder.getStormTopology();
+
+ Set<Integer> taskIds = new HashSet<Integer>();
+
+ for (TopologyContext expectedContext : TestSink.result) {
+ final String thisComponentId = expectedContext.getThisComponentId();
+ int index = taskCounter.get(thisComponentId);
+
+ StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+ when(context.getTaskName()).thenReturn(thisComponentId);
+ when(context.getNumberOfParallelSubtasks()).thenReturn(dops.get(thisComponentId));
+ when(context.getIndexOfThisSubtask()).thenReturn(index);
+ taskCounter.put(thisComponentId, ++index);
+
+ Config stormConfig = new Config();
+ stormConfig.put(StormWrapperSetupHelper.TOPOLOGY_NAME, "test");
+
+ TopologyContext topologyContext = StormWrapperSetupHelper.createTopologyContext(
+ context, operators.get(thisComponentId), stormTopology, stormConfig);
+
+ ComponentCommon expcetedCommon = expectedContext.getComponentCommon(thisComponentId);
+ ComponentCommon common = topologyContext.getComponentCommon(thisComponentId);
+
+ Assert.assertNull(topologyContext.getCodeDir());
+ Assert.assertNull(common.get_json_conf());
+ Assert.assertNull(topologyContext.getExecutorData(null));
+ Assert.assertNull(topologyContext.getPIDDir());
+ Assert.assertNull(topologyContext.getResource(null));
+ Assert.assertNull(topologyContext.getSharedExecutor());
+ Assert.assertNull(expectedContext.getTaskData(null));
+ Assert.assertNull(topologyContext.getThisWorkerPort());
+
+ Assert.assertTrue(expectedContext.getStormId().startsWith(topologyContext.getStormId()));
+
+ Assert.assertEquals(expcetedCommon.get_inputs(), common.get_inputs());
+ Assert.assertEquals(expcetedCommon.get_parallelism_hint(), common.get_parallelism_hint());
+ Assert.assertEquals(expcetedCommon.get_streams(), common.get_streams());
+ Assert.assertEquals(expectedContext.getComponentIds(), topologyContext.getComponentIds());
+ Assert.assertEquals(expectedContext.getComponentStreams(thisComponentId),
+ topologyContext.getComponentStreams(thisComponentId));
+ Assert.assertEquals(thisComponentId, topologyContext.getThisComponentId());
+ Assert.assertEquals(expectedContext.getThisSources(), topologyContext.getThisSources());
+ Assert.assertEquals(expectedContext.getThisStreams(), topologyContext.getThisStreams());
+ Assert.assertEquals(expectedContext.getThisTargets(), topologyContext.getThisTargets());
+ Assert.assertEquals(0, topologyContext.getThisWorkerTasks().size());
+
+ for (int taskId : topologyContext.getComponentTasks(thisComponentId)) {
+ Assert.assertEquals(thisComponentId, topologyContext.getComponentId(taskId));
+ }
+
+ for (String componentId : expectedContext.getComponentIds()) {
+ Assert.assertEquals(expectedContext.getSources(componentId),
+ topologyContext.getSources(componentId));
+ Assert.assertEquals(expectedContext.getTargets(componentId),
+ topologyContext.getTargets(componentId));
+
+ for (String streamId : expectedContext.getComponentStreams(componentId)) {
+ Assert.assertEquals(
+ expectedContext.getComponentOutputFields(componentId, streamId).toList(),
+ topologyContext.getComponentOutputFields(componentId, streamId).toList());
+ }
+ }
+
+ for (String streamId : expectedContext.getThisStreams()) {
+ Assert.assertEquals(expectedContext.getThisOutputFields(streamId).toList(),
+ topologyContext.getThisOutputFields(streamId).toList());
+ }
+
+ HashMap<Integer, String> taskToComponents = new HashMap<Integer, String>();
+ Set<Integer> allTaskIds = new HashSet<Integer>();
+ for (String componentId : expectedContext.getComponentIds()) {
+ List<Integer> possibleTasks = expectedContext.getComponentTasks(componentId);
+ List<Integer> tasks = topologyContext.getComponentTasks(componentId);
+
+ Iterator<Integer> p_it = possibleTasks.iterator();
+ Iterator<Integer> t_it = tasks.iterator();
+ while(p_it.hasNext()) {
+ Assert.assertTrue(t_it.hasNext());
+ Assert.assertNull(taskToComponents.put(p_it.next(), componentId));
+ Assert.assertTrue(allTaskIds.add(t_it.next()));
+ }
+ Assert.assertFalse(t_it.hasNext());
+ }
+
+ Assert.assertEquals(taskToComponents, expectedContext.getTaskToComponent());
+ Assert.assertTrue(taskIds.add(topologyContext.getThisTaskId()));
+
+ try {
+ topologyContext.getHooks();
+ Assert.fail();
+ } catch (UnsupportedOperationException e) { /* expected */ }
+
+ try {
+ topologyContext.getRegisteredMetricByName(null);
+ Assert.fail();
+ } catch (UnsupportedOperationException e) { /* expected */ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
index 64b3e28..d3776fb 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
@@ -20,7 +20,7 @@ import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Values;
-import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
+import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
import java.io.IOException;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
index 6fb764d..5efff66 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
@@ -18,7 +18,7 @@
package org.apache.flink.stormcompatibility.util;
-import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
+import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
/**
* Implements a Storm Spout that reads String[] data stored in the memory. The spout stops
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
index c992b6b..5f637d3 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
@@ -49,10 +49,10 @@ public class SplitBoltTopology {
final String[] tokens = outputPath.split(":");
final String outputFile = tokens[tokens.length - 1];
builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter))
- .shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
+ .shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
} else {
builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4)
- .shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
+ .shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
}
return builder;