You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/23 16:44:28 UTC

[flink] 01/02: [FLINK-10369][tests] Enable YARNITCase to test per job mode deployment

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6a14c02d898b881903e62ed847bf595beaea7cd
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 19 15:09:54 2018 +0200

    [FLINK-10369][tests] Enable YARNITCase to test per job mode deployment
    
    This closes #6717.
---
 .../java/org/apache/flink/yarn/YARNITCase.java     | 68 +++++++++++++---------
 1 file changed, 39 insertions(+), 29 deletions(-)

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 758a098..56cfdb6 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -20,10 +20,12 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
@@ -32,12 +34,16 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.File;
 import java.util.Arrays;
-import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
 
 /**
  * Test cases for the deployment of Yarn Flink clusters.
@@ -50,7 +56,6 @@ public class YARNITCase extends YarnTestBase {
 		startYARNWithConfig(YARN_CONFIGURATION);
 	}
 
-	@Ignore("The cluster cannot be stopped yet.")
 	@Test
 	public void testPerJobMode() throws Exception {
 		Configuration configuration = new Configuration();
@@ -77,9 +82,9 @@ public class YARNITCase extends YarnTestBase {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setParallelism(2);
 
-			env.addSource(new InfiniteSource())
+			env.addSource(new NoDataSource())
 				.shuffle()
-				.addSink(new DiscardingSink<Integer>());
+				.addSink(new DiscardingSink<>());
 
 			final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -87,40 +92,45 @@ public class YARNITCase extends YarnTestBase {
 
 			jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
 
-			ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(
-				clusterSpecification,
-				jobGraph,
-				true);
+			ApplicationId applicationId = null;
+			ClusterClient<ApplicationId> clusterClient = null;
 
-			clusterClient.shutdown();
-		}
-	}
+			try {
+				clusterClient = yarnClusterDescriptor.deployJobCluster(
+					clusterSpecification,
+					jobGraph,
+					false);
+				applicationId = clusterClient.getClusterId();
 
-	private static class InfiniteSource implements ParallelSourceFunction<Integer> {
+				assertThat(clusterClient, is(instanceOf(RestClusterClient.class)));
+				final RestClusterClient<ApplicationId> restClusterClient = (RestClusterClient<ApplicationId>) clusterClient;
 
-		private static final long serialVersionUID = 1642561062000662861L;
-		private volatile boolean running;
-		private final Random random;
+				final CompletableFuture<JobResult> jobResultCompletableFuture = restClusterClient.requestJobResult(jobGraph.getJobID());
 
-		InfiniteSource() {
-			running = true;
-			random = new Random();
-		}
+				final JobResult jobResult = jobResultCompletableFuture.get();
 
-		@Override
-		public void run(SourceContext<Integer> ctx) throws Exception {
-			while (running) {
-				synchronized (ctx.getCheckpointLock()) {
-					ctx.collect(random.nextInt());
+				assertThat(jobResult, is(notNullValue()));
+				assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
+			} finally {
+				if (clusterClient != null) {
+					clusterClient.shutdown();
 				}
 
-				Thread.sleep(5L);
+				if (applicationId != null) {
+					yarnClusterDescriptor.killCluster(applicationId);
+				}
 			}
 		}
+	}
+
+	private static class NoDataSource implements ParallelSourceFunction<Integer> {
+
+		private static final long serialVersionUID = 1642561062000662861L;
 
 		@Override
-		public void cancel() {
-			running = false;
-		}
+		public void run(SourceContext<Integer> ctx) {}
+
+		@Override
+		public void cancel() {}
 	}
 }