You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/04 13:31:01 UTC

[03/19] flink git commit: [FLINK-8963][tests] Port BigUserProgramJobSubmitITCase to MiniClusterResource

[FLINK-8963][tests] Port BigUserProgramJobSubmitITCase to MiniClusterResource

This closes #5772.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/25a97404
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/25a97404
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/25a97404

Branch: refs/heads/release-1.5
Commit: 25a97404919b0ca717c75dd52434d8f7f01c1b96
Parents: 27cf4be
Author: zentol <ch...@apache.org>
Authored: Tue Feb 20 18:02:51 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:18 2018 +0200

----------------------------------------------------------------------
 .../runtime/BigUserProgramJobSubmitITCase.java  | 84 +++++++++++++-------
 1 file changed, 56 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/25a97404/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
index a4d5958..b10dbec 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
@@ -18,21 +18,27 @@
 package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
-import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.apache.flink.testutils.category.New;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.experimental.categories.Category;
 
+import java.net.URI;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -44,37 +50,48 @@ import static org.junit.Assert.assertEquals;
  * Integration test that verifies that a user program with a big(ger) payload is successfully
  * submitted and run.
  */
-@Ignore("Fails on job submission payload being too large - [FLINK-7285]")
+@Category(New.class)
 public class BigUserProgramJobSubmitITCase extends TestLogger {
 
 	// ------------------------------------------------------------------------
 	//  The mini cluster that is shared across tests
 	// ------------------------------------------------------------------------
 
-	private static final int DEFAULT_PARALLELISM = 1;
+	private static final MiniCluster CLUSTER;
+	private static final RestClusterClient<StandaloneClusterId> CLIENT;
 
-	private static LocalFlinkMiniCluster cluster;
+	static {
+		try {
+			MiniClusterConfiguration clusterConfiguration = new MiniClusterConfiguration.Builder()
+				.setNumTaskManagers(1)
+				.setNumSlotsPerTaskManager(1)
+				.build();
+			CLUSTER = new MiniCluster(clusterConfiguration);
+			CLUSTER.start();
 
-	private static final Logger LOG = LoggerFactory.getLogger(BigUserProgramJobSubmitITCase.class);
+			URI restAddress = CLUSTER.getRestAddress();
+
+			final Configuration clientConfig = new Configuration();
+			clientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost());
+			clientConfig.setInteger(RestOptions.REST_PORT, restAddress.getPort());
+
+			CLIENT = new RestClusterClient<>(
+				clientConfig,
+				StandaloneClusterId.getInstance());
+
+		} catch (Exception e) {
+			throw new AssertionError("Could not setup cluster.", e);
+		}
+	}
 
 	// ------------------------------------------------------------------------
 	//  Cluster setup & teardown
 	// ------------------------------------------------------------------------
 
-	@BeforeClass
-	public static void setup() throws Exception {
-		// make sure we do not use a singleActorSystem for the tests
-		// (therefore, we cannot simply inherit from StreamingMultipleProgramsTestBase)
-		LOG.info("Starting FlinkMiniCluster");
-		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false, false, false);
-		TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
-	}
-
 	@AfterClass
 	public static void teardown() throws Exception {
-		LOG.info("Closing FlinkMiniCluster");
-		TestStreamEnvironment.unsetAsContext();
-		TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+		CLIENT.shutdown();
+		CLUSTER.close();
 	}
 
 	private final Random rnd = new Random();
@@ -85,15 +102,16 @@ public class BigUserProgramJobSubmitITCase extends TestLogger {
 	@Test
 	public void bigDataInMap() throws Exception {
 
-		final byte[] data = new byte[100 * 1024 * 1024]; // 100 MB
+		final byte[] data = new byte[16 * 1024 * 1024]; // 16 MB
 		rnd.nextBytes(data); // use random data so that Java does not optimise it away
 		data[1] = 0;
 		data[3] = 0;
 		data[5] = 0;
 
-		TestListResultSink<String> resultSink = new TestListResultSink<>();
+		CollectingSink resultSink = new CollectingSink();
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
 
 		DataStream<Integer> src = env.fromElements(1, 3, 5);
 
@@ -106,15 +124,25 @@ public class BigUserProgramJobSubmitITCase extends TestLogger {
 			}
 		}).addSink(resultSink);
 
-		env.execute();
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+		CLIENT.setDetached(false);
+		CLIENT.submitJob(jobGraph, BigUserProgramJobSubmitITCase.class.getClassLoader());
 
 		List<String> expected = Arrays.asList("x 1 0", "x 3 0", "x 5 0");
 
-		List<String> result = resultSink.getResult();
+		List<String> result = CollectingSink.result;
 
 		Collections.sort(expected);
 		Collections.sort(result);
 
 		assertEquals(expected, result);
 	}
+
+	private static class CollectingSink implements SinkFunction<String> {
+		private static final List<String> result = Collections.synchronizedList(new ArrayList<>(3));
+
+		public void invoke(String value, Context context) throws Exception {
+			result.add(value);
+		}
+	}
 }