You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/23 16:45:20 UTC

[GitHub] tillrohrmann closed pull request #6717: [FLINK-10369][tests] Enable YARNITCase to test per job mode deployment

tillrohrmann closed pull request #6717: [FLINK-10369][tests] Enable YARNITCase to test per job mode deployment
URL: https://github.com/apache/flink/pull/6717
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index 37b8d410a5d..a16cb0b752c 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -22,12 +22,12 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.util.YarnTestUtils;
 
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import java.io.File;
-import java.io.FilenameFilter;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -52,15 +52,15 @@ public TestingYarnClusterDescriptor(
 			sharedYarnClient);
 		List<File> filesToShip = new ArrayList<>();
 
-		File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests"));
+		File testingJar = YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn-tests"));
 		Preconditions.checkNotNull(testingJar, "Could not find the flink-yarn-tests tests jar. " +
 			"Make sure to package the flink-yarn-tests module.");
 
-		File testingRuntimeJar = YarnTestBase.findFile("..", new TestJarFinder("flink-runtime"));
+		File testingRuntimeJar = YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-runtime"));
 		Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " +
 			"jar. Make sure to package the flink-runtime module.");
 
-		File testingYarnJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn"));
+		File testingYarnJar = YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn"));
 		Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-yarn tests " +
 			"jar. Make sure to package the flink-yarn module.");
 
@@ -89,18 +89,4 @@ public YarnClusterClient deployJobCluster(
 		throw new UnsupportedOperationException("Cannot deploy a per-job cluster yet.");
 	}
 
-	static class TestJarFinder implements FilenameFilter {
-
-		private final String jarName;
-
-		TestJarFinder(final String jarName) {
-			this.jarName = jarName;
-		}
-
-		@Override
-		public boolean accept(File dir, String name) {
-			return name.startsWith(jarName) && name.endsWith("-tests.jar") &&
-				dir.getAbsolutePath().contains(dir.separator + jarName + dir.separator);
-		}
-	}
 }
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 9a8f5033f3f..3625a90076f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -60,7 +60,7 @@
 /**
  * Tests that verify correct HA behavior.
  */
-public class YARNHighAvailabilityITCase extends YarnTestBase {
+	public class YARNHighAvailabilityITCase extends YarnTestBase {
 
 	private static TestingServer zkServer;
 
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 758a09866d0..814e8082f70 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -20,24 +20,31 @@
 
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.yarn.util.YarnTestUtils;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.File;
 import java.util.Arrays;
-import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
 
 /**
  * Test cases for the deployment of Yarn Flink clusters.
@@ -50,7 +57,6 @@ public static void setup() {
 		startYARNWithConfig(YARN_CONFIGURATION);
 	}
 
-	@Ignore("The cluster cannot be stopped yet.")
 	@Test
 	public void testPerJobMode() throws Exception {
 		Configuration configuration = new Configuration();
@@ -77,50 +83,55 @@ public void testPerJobMode() throws Exception {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setParallelism(2);
 
-			env.addSource(new InfiniteSource())
+			env.addSource(new NoDataSource())
 				.shuffle()
-				.addSink(new DiscardingSink<Integer>());
+				.addSink(new DiscardingSink<>());
 
 			final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
-			File testingJar = YarnTestBase.findFile("..", new TestingYarnClusterDescriptor.TestJarFinder("flink-yarn-tests"));
+			File testingJar = YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn-tests"));
 
 			jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
 
-			ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(
-				clusterSpecification,
-				jobGraph,
-				true);
+			ApplicationId applicationId = null;
+			ClusterClient<ApplicationId> clusterClient = null;
 
-			clusterClient.shutdown();
-		}
-	}
+			try {
+				clusterClient = yarnClusterDescriptor.deployJobCluster(
+					clusterSpecification,
+					jobGraph,
+					false);
+				applicationId = clusterClient.getClusterId();
 
-	private static class InfiniteSource implements ParallelSourceFunction<Integer> {
+				assertThat(clusterClient, is(instanceOf(RestClusterClient.class)));
+				final RestClusterClient<ApplicationId> restClusterClient = (RestClusterClient<ApplicationId>) clusterClient;
 
-		private static final long serialVersionUID = 1642561062000662861L;
-		private volatile boolean running;
-		private final Random random;
+				final CompletableFuture<JobResult> jobResultCompletableFuture = restClusterClient.requestJobResult(jobGraph.getJobID());
 
-		InfiniteSource() {
-			running = true;
-			random = new Random();
-		}
+				final JobResult jobResult = jobResultCompletableFuture.get();
 
-		@Override
-		public void run(SourceContext<Integer> ctx) throws Exception {
-			while (running) {
-				synchronized (ctx.getCheckpointLock()) {
-					ctx.collect(random.nextInt());
+				assertThat(jobResult, is(notNullValue()));
+				assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
+			} finally {
+				if (clusterClient != null) {
+					clusterClient.shutdown();
 				}
 
-				Thread.sleep(5L);
+				if (applicationId != null) {
+					yarnClusterDescriptor.killCluster(applicationId);
+				}
 			}
 		}
+	}
+
+	private static class NoDataSource implements ParallelSourceFunction<Integer> {
+
+		private static final long serialVersionUID = 1642561062000662861L;
 
 		@Override
-		public void cancel() {
-			running = false;
-		}
+		public void run(SourceContext<Integer> ctx) {}
+
+		@Override
+		public void cancel() {}
 	}
 }
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java
index 25d833b570f..f7a9634b872 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java
@@ -20,6 +20,7 @@
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
 
 /**
  * Utility methods for YARN tests.
@@ -33,4 +34,22 @@ public static File getTestJarPath(String fileName) throws FileNotFoundException
 		}
 		return f;
 	}
+
+	/**
+	 * Filename filter which finds the test jar for the given name.
+	 */
+	public static class TestJarFinder implements FilenameFilter {
+
+		private final String jarName;
+
+		public TestJarFinder(final String jarName) {
+			this.jarName = jarName;
+		}
+
+		@Override
+		public boolean accept(File dir, String name) {
+			return name.startsWith(jarName) && name.endsWith("-tests.jar") &&
+				dir.getAbsolutePath().contains(File.separator + jarName + File.separator);
+		}
+	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services