You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/20 00:55:16 UTC

[02/19] flink git commit: [FLINK-5522] [storm compatibility] Move Storm LocalCluster based test to a separate class

[FLINK-5522] [storm compatibility] Move Storm LocalCluster based test to a separate class

This fixes the problem that the Storm LocalCluster can't run with powermock

This closes #3138


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d05fc377
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d05fc377
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d05fc377

Branch: refs/heads/master
Commit: d05fc377ee688b231fb1b0daeb8a34fd054f3ca1
Parents: 5313459
Author: liuyuzhong7 <li...@gmail.com>
Authored: Thu Feb 9 16:16:15 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Feb 18 19:19:34 2017 +0100

----------------------------------------------------------------------
 .../storm/wrappers/WrapperSetupHelperTest.java  | 167 +---------------
 .../WrapperSetupInLocalClusterTest.java         | 190 +++++++++++++++++++
 2 files changed, 191 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d05fc377/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
index 5e29ac4..5f38705 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -17,29 +17,15 @@
 
 package org.apache.flink.storm.wrappers;
 
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.ComponentCommon;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.task.TopologyContext;
+import org.apache.flink.storm.util.AbstractTest;
 import org.apache.storm.topology.IComponent;
 import org.apache.storm.topology.IRichBolt;
 import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
-
-import org.apache.flink.storm.api.FlinkTopology;
-import org.apache.flink.storm.util.AbstractTest;
-import org.apache.flink.storm.util.TestDummyBolt;
-import org.apache.flink.storm.util.TestDummySpout;
-import org.apache.flink.storm.util.TestSink;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -47,14 +33,9 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
 
 import static java.util.Collections.singleton;
-
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(WrapperSetupHelper.class)
@@ -150,150 +131,4 @@ public class WrapperSetupHelperTest extends AbstractTest {
 				boltOrSpout,
 				numberOfAttributes == -1 ? new HashSet<String>(singleton(Utils.DEFAULT_STREAM_ID)) : null));
 	}
-
-	@Test
-	public void testCreateTopologyContext() {
-		HashMap<String, Integer> dops = new HashMap<String, Integer>();
-		dops.put("spout1", 1);
-		dops.put("spout2", 3);
-		dops.put("bolt1", 1);
-		dops.put("bolt2", 2);
-		dops.put("sink", 1);
-
-		HashMap<String, Integer> taskCounter = new HashMap<String, Integer>();
-		taskCounter.put("spout1", 0);
-		taskCounter.put("spout2", 0);
-		taskCounter.put("bolt1", 0);
-		taskCounter.put("bolt2", 0);
-		taskCounter.put("sink", 0);
-
-		HashMap<String, IComponent> operators = new HashMap<String, IComponent>();
-		operators.put("spout1", new TestDummySpout());
-		operators.put("spout2", new TestDummySpout());
-		operators.put("bolt1", new TestDummyBolt());
-		operators.put("bolt2", new TestDummyBolt());
-		operators.put("sink", new TestSink());
-
-		TopologyBuilder builder = new TopologyBuilder();
-
-		builder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1"));
-		builder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2"));
-		builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
-		builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
-		builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
-				.shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId)
-				.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
-				.shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId)
-				.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
-
-//		LocalCluster cluster = new LocalCluster();
-//		Config c = new Config();
-//		c.setNumAckers(0);
-//		cluster.submitTopology("test", c, builder.createTopology());
-//
-//		while (TestSink.result.size() != 8) {
-//			Utils.sleep(100);
-//		}
-//		cluster.shutdown();
-
-		final FlinkTopology flinkBuilder = FlinkTopology.createTopology(builder);
-		StormTopology stormTopology = flinkBuilder.getStormTopology();
-
-		Set<Integer> taskIds = new HashSet<Integer>();
-
-		for (TopologyContext expectedContext : TestSink.result) {
-			final String thisComponentId = expectedContext.getThisComponentId();
-			int index = taskCounter.get(thisComponentId);
-
-			StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
-			when(context.getTaskName()).thenReturn(thisComponentId);
-			when(context.getNumberOfParallelSubtasks()).thenReturn(dops.get(thisComponentId));
-			when(context.getIndexOfThisSubtask()).thenReturn(index);
-			taskCounter.put(thisComponentId, ++index);
-
-			Config stormConfig = new Config();
-			stormConfig.put(WrapperSetupHelper.TOPOLOGY_NAME, "test");
-
-			TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext(context,
-					operators.get(thisComponentId), thisComponentId, stormTopology, stormConfig);
-
-			ComponentCommon expcetedCommon = expectedContext.getComponentCommon(thisComponentId);
-			ComponentCommon common = topologyContext.getComponentCommon(thisComponentId);
-
-			Assert.assertNull(topologyContext.getCodeDir());
-			Assert.assertNull(common.get_json_conf());
-			Assert.assertNull(topologyContext.getExecutorData(null));
-			Assert.assertNull(topologyContext.getPIDDir());
-			Assert.assertNull(topologyContext.getResource(null));
-			Assert.assertNull(topologyContext.getSharedExecutor());
-			Assert.assertNull(expectedContext.getTaskData(null));
-			Assert.assertNull(topologyContext.getThisWorkerPort());
-
-			Assert.assertTrue(expectedContext.getStormId().startsWith(topologyContext.getStormId()));
-
-			Assert.assertEquals(expcetedCommon.get_inputs(), common.get_inputs());
-			Assert.assertEquals(expcetedCommon.get_parallelism_hint(), common.get_parallelism_hint());
-			Assert.assertEquals(expcetedCommon.get_streams(), common.get_streams());
-			Assert.assertEquals(expectedContext.getComponentIds(), topologyContext.getComponentIds());
-			Assert.assertEquals(expectedContext.getComponentStreams(thisComponentId),
-					topologyContext.getComponentStreams(thisComponentId));
-			Assert.assertEquals(thisComponentId, topologyContext.getThisComponentId());
-			Assert.assertEquals(expectedContext.getThisSources(), topologyContext.getThisSources());
-			Assert.assertEquals(expectedContext.getThisStreams(), topologyContext.getThisStreams());
-			Assert.assertEquals(expectedContext.getThisTargets(), topologyContext.getThisTargets());
-			Assert.assertEquals(0, topologyContext.getThisWorkerTasks().size());
-
-			for (int taskId : topologyContext.getComponentTasks(thisComponentId)) {
-				Assert.assertEquals(thisComponentId, topologyContext.getComponentId(taskId));
-			}
-
-			for (String componentId : expectedContext.getComponentIds()) {
-				Assert.assertEquals(expectedContext.getSources(componentId),
-						topologyContext.getSources(componentId));
-				Assert.assertEquals(expectedContext.getTargets(componentId),
-						topologyContext.getTargets(componentId));
-
-				for (String streamId : expectedContext.getComponentStreams(componentId)) {
-					Assert.assertEquals(
-							expectedContext.getComponentOutputFields(componentId, streamId).toList(),
-							topologyContext.getComponentOutputFields(componentId, streamId).toList());
-				}
-			}
-
-			for (String streamId : expectedContext.getThisStreams()) {
-				Assert.assertEquals(expectedContext.getThisOutputFields(streamId).toList(),
-						topologyContext.getThisOutputFields(streamId).toList());
-			}
-
-			HashMap<Integer, String> taskToComponents = new HashMap<Integer, String>();
-			Set<Integer> allTaskIds = new HashSet<Integer>();
-			for (String componentId : expectedContext.getComponentIds()) {
-				List<Integer> possibleTasks = expectedContext.getComponentTasks(componentId);
-				List<Integer> tasks = topologyContext.getComponentTasks(componentId);
-
-				Iterator<Integer> p_it = possibleTasks.iterator();
-				Iterator<Integer> t_it = tasks.iterator();
-				while(p_it.hasNext()) {
-					Assert.assertTrue(t_it.hasNext());
-					Assert.assertNull(taskToComponents.put(p_it.next(), componentId));
-					Assert.assertTrue(allTaskIds.add(t_it.next()));
-				}
-				Assert.assertFalse(t_it.hasNext());
-			}
-
-			Assert.assertEquals(taskToComponents, expectedContext.getTaskToComponent());
-			Assert.assertTrue(taskIds.add(topologyContext.getThisTaskId()));
-
-			try {
-				topologyContext.getHooks();
-				Assert.fail();
-			} catch (UnsupportedOperationException e) { /* expected */ }
-
-			try {
-				topologyContext.getRegisteredMetricByName(null);
-				Assert.fail();
-			} catch (UnsupportedOperationException e) { /* expected */ }
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d05fc377/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java
new file mode 100644
index 0000000..00173df
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import org.apache.flink.storm.api.FlinkTopology;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.util.TestSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IComponent;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class WrapperSetupInLocalClusterTest extends AbstractTest {
+
+	@Test
+	public void testCreateTopologyContext() {
+		HashMap<String, Integer> dops = new HashMap<String, Integer>();
+		dops.put("spout1", 1);
+		dops.put("spout2", 3);
+		dops.put("bolt1", 1);
+		dops.put("bolt2", 2);
+		dops.put("sink", 1);
+
+		HashMap<String, Integer> taskCounter = new HashMap<String, Integer>();
+		taskCounter.put("spout1", 0);
+		taskCounter.put("spout2", 0);
+		taskCounter.put("bolt1", 0);
+		taskCounter.put("bolt2", 0);
+		taskCounter.put("sink", 0);
+
+		HashMap<String, IComponent> operators = new HashMap<String, IComponent>();
+		operators.put("spout1", new TestDummySpout());
+		operators.put("spout2", new TestDummySpout());
+		operators.put("bolt1", new TestDummyBolt());
+		operators.put("bolt2", new TestDummyBolt());
+		operators.put("sink", new TestSink());
+
+		TopologyBuilder builder = new TopologyBuilder();
+
+		builder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1"));
+		builder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2"));
+		builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
+		builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
+		builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
+				.shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId)
+				.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
+				.shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId)
+				.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
+
+		LocalCluster cluster = new LocalCluster();
+		Config c = new Config();
+		c.setNumAckers(0);
+		cluster.submitTopology("test", c, builder.createTopology());
+
+		while (TestSink.result.size() != 8) {
+			Utils.sleep(100);
+		}
+		cluster.shutdown();
+		final FlinkTopology flinkBuilder = FlinkTopology.createTopology(builder);
+		StormTopology stormTopology = flinkBuilder.getStormTopology();
+
+		Set<Integer> taskIds = new HashSet<Integer>();
+
+		for (TopologyContext expectedContext : TestSink.result) {
+			final String thisComponentId = expectedContext.getThisComponentId();
+			int index = taskCounter.get(thisComponentId);
+
+			StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+			when(context.getTaskName()).thenReturn(thisComponentId);
+			when(context.getNumberOfParallelSubtasks()).thenReturn(dops.get(thisComponentId));
+			when(context.getIndexOfThisSubtask()).thenReturn(index);
+			taskCounter.put(thisComponentId, ++index);
+
+			Config stormConfig = new Config();
+			stormConfig.put(WrapperSetupHelper.TOPOLOGY_NAME, "test");
+
+			TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext(context,
+				operators.get(thisComponentId), thisComponentId, stormTopology, stormConfig);
+
+			ComponentCommon expcetedCommon = expectedContext.getComponentCommon(thisComponentId);
+			ComponentCommon common = topologyContext.getComponentCommon(thisComponentId);
+
+			Assert.assertNull(topologyContext.getCodeDir());
+			Assert.assertNull(common.get_json_conf());
+			Assert.assertNull(topologyContext.getExecutorData(null));
+			Assert.assertNull(topologyContext.getPIDDir());
+			Assert.assertNull(topologyContext.getResource(null));
+			Assert.assertNull(topologyContext.getSharedExecutor());
+			Assert.assertNull(expectedContext.getTaskData(null));
+			Assert.assertNull(topologyContext.getThisWorkerPort());
+
+			Assert.assertTrue(expectedContext.getStormId().startsWith(topologyContext.getStormId()));
+
+			Assert.assertEquals(expcetedCommon.get_inputs(), common.get_inputs());
+			Assert.assertEquals(expcetedCommon.get_parallelism_hint(), common.get_parallelism_hint());
+			Assert.assertEquals(expcetedCommon.get_streams(), common.get_streams());
+			Assert.assertEquals(expectedContext.getComponentIds(), topologyContext.getComponentIds());
+			Assert.assertEquals(expectedContext.getComponentStreams(thisComponentId),
+				topologyContext.getComponentStreams(thisComponentId));
+			Assert.assertEquals(thisComponentId, topologyContext.getThisComponentId());
+			Assert.assertEquals(expectedContext.getThisSources(), topologyContext.getThisSources());
+			Assert.assertEquals(expectedContext.getThisStreams(), topologyContext.getThisStreams());
+			Assert.assertEquals(expectedContext.getThisTargets(), topologyContext.getThisTargets());
+			Assert.assertEquals(0, topologyContext.getThisWorkerTasks().size());
+
+			for (int taskId : topologyContext.getComponentTasks(thisComponentId)) {
+				Assert.assertEquals(thisComponentId, topologyContext.getComponentId(taskId));
+			}
+
+			for (String componentId : expectedContext.getComponentIds()) {
+				Assert.assertEquals(expectedContext.getSources(componentId),
+					topologyContext.getSources(componentId));
+				Assert.assertEquals(expectedContext.getTargets(componentId),
+					topologyContext.getTargets(componentId));
+
+				for (String streamId : expectedContext.getComponentStreams(componentId)) {
+					Assert.assertEquals(
+						expectedContext.getComponentOutputFields(componentId, streamId).toList(),
+						topologyContext.getComponentOutputFields(componentId, streamId).toList());
+				}
+			}
+
+			for (String streamId : expectedContext.getThisStreams()) {
+				Assert.assertEquals(expectedContext.getThisOutputFields(streamId).toList(),
+					topologyContext.getThisOutputFields(streamId).toList());
+			}
+
+			HashMap<Integer, String> taskToComponents = new HashMap<Integer, String>();
+			Set<Integer> allTaskIds = new HashSet<Integer>();
+			for (String componentId : expectedContext.getComponentIds()) {
+				List<Integer> possibleTasks = expectedContext.getComponentTasks(componentId);
+				List<Integer> tasks = topologyContext.getComponentTasks(componentId);
+
+				Iterator<Integer> p_it = possibleTasks.iterator();
+				Iterator<Integer> t_it = tasks.iterator();
+				while(p_it.hasNext()) {
+					Assert.assertTrue(t_it.hasNext());
+					Assert.assertNull(taskToComponents.put(p_it.next(), componentId));
+					Assert.assertTrue(allTaskIds.add(t_it.next()));
+				}
+				Assert.assertFalse(t_it.hasNext());
+			}
+
+			Assert.assertEquals(taskToComponents, expectedContext.getTaskToComponent());
+			Assert.assertTrue(taskIds.add(topologyContext.getThisTaskId()));
+
+			try {
+				topologyContext.getHooks();
+				Assert.fail();
+			} catch (UnsupportedOperationException e) { /* expected */ }
+
+			try {
+				topologyContext.getRegisteredMetricByName(null);
+				Assert.fail();
+			} catch (UnsupportedOperationException e) { /* expected */ }
+		}
+	}
+
+}