You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/20 00:55:16 UTC
[02/19] flink git commit: [FLINK-5522] [storm compatibility] Move
Storm LocalCluster based test to a separate class
[FLINK-5522] [storm compatibility] Move Storm LocalCluster based test to a separate class
This fixes the problem that the Storm LocalCluster can't run with powermock
This closes #3138
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d05fc377
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d05fc377
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d05fc377
Branch: refs/heads/master
Commit: d05fc377ee688b231fb1b0daeb8a34fd054f3ca1
Parents: 5313459
Author: liuyuzhong7 <li...@gmail.com>
Authored: Thu Feb 9 16:16:15 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Feb 18 19:19:34 2017 +0100
----------------------------------------------------------------------
.../storm/wrappers/WrapperSetupHelperTest.java | 167 +---------------
.../WrapperSetupInLocalClusterTest.java | 190 +++++++++++++++++++
2 files changed, 191 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d05fc377/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
index 5e29ac4..5f38705 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -17,29 +17,15 @@
package org.apache.flink.storm.wrappers;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.ComponentCommon;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.task.TopologyContext;
+import org.apache.flink.storm.util.AbstractTest;
import org.apache.storm.topology.IComponent;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
-
-import org.apache.flink.storm.api.FlinkTopology;
-import org.apache.flink.storm.util.AbstractTest;
-import org.apache.flink.storm.util.TestDummyBolt;
-import org.apache.flink.storm.util.TestDummySpout;
-import org.apache.flink.storm.util.TestSink;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
-
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -47,14 +33,9 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
import static java.util.Collections.singleton;
-
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(WrapperSetupHelper.class)
@@ -150,150 +131,4 @@ public class WrapperSetupHelperTest extends AbstractTest {
boltOrSpout,
numberOfAttributes == -1 ? new HashSet<String>(singleton(Utils.DEFAULT_STREAM_ID)) : null));
}
-
- @Test
- public void testCreateTopologyContext() {
- HashMap<String, Integer> dops = new HashMap<String, Integer>();
- dops.put("spout1", 1);
- dops.put("spout2", 3);
- dops.put("bolt1", 1);
- dops.put("bolt2", 2);
- dops.put("sink", 1);
-
- HashMap<String, Integer> taskCounter = new HashMap<String, Integer>();
- taskCounter.put("spout1", 0);
- taskCounter.put("spout2", 0);
- taskCounter.put("bolt1", 0);
- taskCounter.put("bolt2", 0);
- taskCounter.put("sink", 0);
-
- HashMap<String, IComponent> operators = new HashMap<String, IComponent>();
- operators.put("spout1", new TestDummySpout());
- operators.put("spout2", new TestDummySpout());
- operators.put("bolt1", new TestDummyBolt());
- operators.put("bolt2", new TestDummyBolt());
- operators.put("sink", new TestSink());
-
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1"));
- builder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2"));
- builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
- builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
- builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
- .shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId)
- .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
- .shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId)
- .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
-
-// LocalCluster cluster = new LocalCluster();
-// Config c = new Config();
-// c.setNumAckers(0);
-// cluster.submitTopology("test", c, builder.createTopology());
-//
-// while (TestSink.result.size() != 8) {
-// Utils.sleep(100);
-// }
-// cluster.shutdown();
-
- final FlinkTopology flinkBuilder = FlinkTopology.createTopology(builder);
- StormTopology stormTopology = flinkBuilder.getStormTopology();
-
- Set<Integer> taskIds = new HashSet<Integer>();
-
- for (TopologyContext expectedContext : TestSink.result) {
- final String thisComponentId = expectedContext.getThisComponentId();
- int index = taskCounter.get(thisComponentId);
-
- StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
- when(context.getTaskName()).thenReturn(thisComponentId);
- when(context.getNumberOfParallelSubtasks()).thenReturn(dops.get(thisComponentId));
- when(context.getIndexOfThisSubtask()).thenReturn(index);
- taskCounter.put(thisComponentId, ++index);
-
- Config stormConfig = new Config();
- stormConfig.put(WrapperSetupHelper.TOPOLOGY_NAME, "test");
-
- TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext(context,
- operators.get(thisComponentId), thisComponentId, stormTopology, stormConfig);
-
- ComponentCommon expcetedCommon = expectedContext.getComponentCommon(thisComponentId);
- ComponentCommon common = topologyContext.getComponentCommon(thisComponentId);
-
- Assert.assertNull(topologyContext.getCodeDir());
- Assert.assertNull(common.get_json_conf());
- Assert.assertNull(topologyContext.getExecutorData(null));
- Assert.assertNull(topologyContext.getPIDDir());
- Assert.assertNull(topologyContext.getResource(null));
- Assert.assertNull(topologyContext.getSharedExecutor());
- Assert.assertNull(expectedContext.getTaskData(null));
- Assert.assertNull(topologyContext.getThisWorkerPort());
-
- Assert.assertTrue(expectedContext.getStormId().startsWith(topologyContext.getStormId()));
-
- Assert.assertEquals(expcetedCommon.get_inputs(), common.get_inputs());
- Assert.assertEquals(expcetedCommon.get_parallelism_hint(), common.get_parallelism_hint());
- Assert.assertEquals(expcetedCommon.get_streams(), common.get_streams());
- Assert.assertEquals(expectedContext.getComponentIds(), topologyContext.getComponentIds());
- Assert.assertEquals(expectedContext.getComponentStreams(thisComponentId),
- topologyContext.getComponentStreams(thisComponentId));
- Assert.assertEquals(thisComponentId, topologyContext.getThisComponentId());
- Assert.assertEquals(expectedContext.getThisSources(), topologyContext.getThisSources());
- Assert.assertEquals(expectedContext.getThisStreams(), topologyContext.getThisStreams());
- Assert.assertEquals(expectedContext.getThisTargets(), topologyContext.getThisTargets());
- Assert.assertEquals(0, topologyContext.getThisWorkerTasks().size());
-
- for (int taskId : topologyContext.getComponentTasks(thisComponentId)) {
- Assert.assertEquals(thisComponentId, topologyContext.getComponentId(taskId));
- }
-
- for (String componentId : expectedContext.getComponentIds()) {
- Assert.assertEquals(expectedContext.getSources(componentId),
- topologyContext.getSources(componentId));
- Assert.assertEquals(expectedContext.getTargets(componentId),
- topologyContext.getTargets(componentId));
-
- for (String streamId : expectedContext.getComponentStreams(componentId)) {
- Assert.assertEquals(
- expectedContext.getComponentOutputFields(componentId, streamId).toList(),
- topologyContext.getComponentOutputFields(componentId, streamId).toList());
- }
- }
-
- for (String streamId : expectedContext.getThisStreams()) {
- Assert.assertEquals(expectedContext.getThisOutputFields(streamId).toList(),
- topologyContext.getThisOutputFields(streamId).toList());
- }
-
- HashMap<Integer, String> taskToComponents = new HashMap<Integer, String>();
- Set<Integer> allTaskIds = new HashSet<Integer>();
- for (String componentId : expectedContext.getComponentIds()) {
- List<Integer> possibleTasks = expectedContext.getComponentTasks(componentId);
- List<Integer> tasks = topologyContext.getComponentTasks(componentId);
-
- Iterator<Integer> p_it = possibleTasks.iterator();
- Iterator<Integer> t_it = tasks.iterator();
- while(p_it.hasNext()) {
- Assert.assertTrue(t_it.hasNext());
- Assert.assertNull(taskToComponents.put(p_it.next(), componentId));
- Assert.assertTrue(allTaskIds.add(t_it.next()));
- }
- Assert.assertFalse(t_it.hasNext());
- }
-
- Assert.assertEquals(taskToComponents, expectedContext.getTaskToComponent());
- Assert.assertTrue(taskIds.add(topologyContext.getThisTaskId()));
-
- try {
- topologyContext.getHooks();
- Assert.fail();
- } catch (UnsupportedOperationException e) { /* expected */ }
-
- try {
- topologyContext.getRegisteredMetricByName(null);
- Assert.fail();
- } catch (UnsupportedOperationException e) { /* expected */ }
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d05fc377/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java
new file mode 100644
index 0000000..00173df
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import org.apache.flink.storm.api.FlinkTopology;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.util.TestSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IComponent;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class WrapperSetupInLocalClusterTest extends AbstractTest {
+
+ @Test
+ public void testCreateTopologyContext() {
+ HashMap<String, Integer> dops = new HashMap<String, Integer>();
+ dops.put("spout1", 1);
+ dops.put("spout2", 3);
+ dops.put("bolt1", 1);
+ dops.put("bolt2", 2);
+ dops.put("sink", 1);
+
+ HashMap<String, Integer> taskCounter = new HashMap<String, Integer>();
+ taskCounter.put("spout1", 0);
+ taskCounter.put("spout2", 0);
+ taskCounter.put("bolt1", 0);
+ taskCounter.put("bolt2", 0);
+ taskCounter.put("sink", 0);
+
+ HashMap<String, IComponent> operators = new HashMap<String, IComponent>();
+ operators.put("spout1", new TestDummySpout());
+ operators.put("spout2", new TestDummySpout());
+ operators.put("bolt1", new TestDummyBolt());
+ operators.put("bolt2", new TestDummyBolt());
+ operators.put("sink", new TestSink());
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1"));
+ builder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2"));
+ builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
+ builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
+ builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
+ .shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId)
+ .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
+ .shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId)
+ .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
+
+ LocalCluster cluster = new LocalCluster();
+ Config c = new Config();
+ c.setNumAckers(0);
+ cluster.submitTopology("test", c, builder.createTopology());
+
+ while (TestSink.result.size() != 8) {
+ Utils.sleep(100);
+ }
+ cluster.shutdown();
+ final FlinkTopology flinkBuilder = FlinkTopology.createTopology(builder);
+ StormTopology stormTopology = flinkBuilder.getStormTopology();
+
+ Set<Integer> taskIds = new HashSet<Integer>();
+
+ for (TopologyContext expectedContext : TestSink.result) {
+ final String thisComponentId = expectedContext.getThisComponentId();
+ int index = taskCounter.get(thisComponentId);
+
+ StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+ when(context.getTaskName()).thenReturn(thisComponentId);
+ when(context.getNumberOfParallelSubtasks()).thenReturn(dops.get(thisComponentId));
+ when(context.getIndexOfThisSubtask()).thenReturn(index);
+ taskCounter.put(thisComponentId, ++index);
+
+ Config stormConfig = new Config();
+ stormConfig.put(WrapperSetupHelper.TOPOLOGY_NAME, "test");
+
+ TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext(context,
+ operators.get(thisComponentId), thisComponentId, stormTopology, stormConfig);
+
+ ComponentCommon expcetedCommon = expectedContext.getComponentCommon(thisComponentId);
+ ComponentCommon common = topologyContext.getComponentCommon(thisComponentId);
+
+ Assert.assertNull(topologyContext.getCodeDir());
+ Assert.assertNull(common.get_json_conf());
+ Assert.assertNull(topologyContext.getExecutorData(null));
+ Assert.assertNull(topologyContext.getPIDDir());
+ Assert.assertNull(topologyContext.getResource(null));
+ Assert.assertNull(topologyContext.getSharedExecutor());
+ Assert.assertNull(expectedContext.getTaskData(null));
+ Assert.assertNull(topologyContext.getThisWorkerPort());
+
+ Assert.assertTrue(expectedContext.getStormId().startsWith(topologyContext.getStormId()));
+
+ Assert.assertEquals(expcetedCommon.get_inputs(), common.get_inputs());
+ Assert.assertEquals(expcetedCommon.get_parallelism_hint(), common.get_parallelism_hint());
+ Assert.assertEquals(expcetedCommon.get_streams(), common.get_streams());
+ Assert.assertEquals(expectedContext.getComponentIds(), topologyContext.getComponentIds());
+ Assert.assertEquals(expectedContext.getComponentStreams(thisComponentId),
+ topologyContext.getComponentStreams(thisComponentId));
+ Assert.assertEquals(thisComponentId, topologyContext.getThisComponentId());
+ Assert.assertEquals(expectedContext.getThisSources(), topologyContext.getThisSources());
+ Assert.assertEquals(expectedContext.getThisStreams(), topologyContext.getThisStreams());
+ Assert.assertEquals(expectedContext.getThisTargets(), topologyContext.getThisTargets());
+ Assert.assertEquals(0, topologyContext.getThisWorkerTasks().size());
+
+ for (int taskId : topologyContext.getComponentTasks(thisComponentId)) {
+ Assert.assertEquals(thisComponentId, topologyContext.getComponentId(taskId));
+ }
+
+ for (String componentId : expectedContext.getComponentIds()) {
+ Assert.assertEquals(expectedContext.getSources(componentId),
+ topologyContext.getSources(componentId));
+ Assert.assertEquals(expectedContext.getTargets(componentId),
+ topologyContext.getTargets(componentId));
+
+ for (String streamId : expectedContext.getComponentStreams(componentId)) {
+ Assert.assertEquals(
+ expectedContext.getComponentOutputFields(componentId, streamId).toList(),
+ topologyContext.getComponentOutputFields(componentId, streamId).toList());
+ }
+ }
+
+ for (String streamId : expectedContext.getThisStreams()) {
+ Assert.assertEquals(expectedContext.getThisOutputFields(streamId).toList(),
+ topologyContext.getThisOutputFields(streamId).toList());
+ }
+
+ HashMap<Integer, String> taskToComponents = new HashMap<Integer, String>();
+ Set<Integer> allTaskIds = new HashSet<Integer>();
+ for (String componentId : expectedContext.getComponentIds()) {
+ List<Integer> possibleTasks = expectedContext.getComponentTasks(componentId);
+ List<Integer> tasks = topologyContext.getComponentTasks(componentId);
+
+ Iterator<Integer> p_it = possibleTasks.iterator();
+ Iterator<Integer> t_it = tasks.iterator();
+ while(p_it.hasNext()) {
+ Assert.assertTrue(t_it.hasNext());
+ Assert.assertNull(taskToComponents.put(p_it.next(), componentId));
+ Assert.assertTrue(allTaskIds.add(t_it.next()));
+ }
+ Assert.assertFalse(t_it.hasNext());
+ }
+
+ Assert.assertEquals(taskToComponents, expectedContext.getTaskToComponent());
+ Assert.assertTrue(taskIds.add(topologyContext.getThisTaskId()));
+
+ try {
+ topologyContext.getHooks();
+ Assert.fail();
+ } catch (UnsupportedOperationException e) { /* expected */ }
+
+ try {
+ topologyContext.getRegisteredMetricByName(null);
+ Assert.fail();
+ } catch (UnsupportedOperationException e) { /* expected */ }
+ }
+ }
+
+}