You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/23 16:44:27 UTC

[flink] branch master updated (f343204 -> d018eed)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from f343204  [FLINK-10157][State TTL] Allow `null` user values in map state with TTL
     new f6a14c0  [FLINK-10369][tests] Enable YARNITCase to test per job mode deployment
     new d018eed  [hotfix] Move TestingYarnClusterDescriptor#TestJarFinder into YarnTestUtils

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/yarn/TestingYarnClusterDescriptor.java   | 22 ++-----
 .../java/org/apache/flink/yarn/YARNITCase.java     | 71 +++++++++++++---------
 .../org/apache/flink/yarn/util/YarnTestUtils.java  | 19 ++++++
 3 files changed, 64 insertions(+), 48 deletions(-)


[flink] 01/02: [FLINK-10369][tests] Enable YARNITCase to test per job mode deployment

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6a14c02d898b881903e62ed847bf595beaea7cd
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 19 15:09:54 2018 +0200

    [FLINK-10369][tests] Enable YARNITCase to test per job mode deployment
    
    This closes #6717.
---
 .../java/org/apache/flink/yarn/YARNITCase.java     | 68 +++++++++++++---------
 1 file changed, 39 insertions(+), 29 deletions(-)

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 758a098..56cfdb6 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -20,10 +20,12 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
@@ -32,12 +34,16 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.File;
 import java.util.Arrays;
-import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
 
 /**
  * Test cases for the deployment of Yarn Flink clusters.
@@ -50,7 +56,6 @@ public class YARNITCase extends YarnTestBase {
 		startYARNWithConfig(YARN_CONFIGURATION);
 	}
 
-	@Ignore("The cluster cannot be stopped yet.")
 	@Test
 	public void testPerJobMode() throws Exception {
 		Configuration configuration = new Configuration();
@@ -77,9 +82,9 @@ public class YARNITCase extends YarnTestBase {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setParallelism(2);
 
-			env.addSource(new InfiniteSource())
+			env.addSource(new NoDataSource())
 				.shuffle()
-				.addSink(new DiscardingSink<Integer>());
+				.addSink(new DiscardingSink<>());
 
 			final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -87,40 +92,45 @@ public class YARNITCase extends YarnTestBase {
 
 			jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
 
-			ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(
-				clusterSpecification,
-				jobGraph,
-				true);
+			ApplicationId applicationId = null;
+			ClusterClient<ApplicationId> clusterClient = null;
 
-			clusterClient.shutdown();
-		}
-	}
+			try {
+				clusterClient = yarnClusterDescriptor.deployJobCluster(
+					clusterSpecification,
+					jobGraph,
+					false);
+				applicationId = clusterClient.getClusterId();
 
-	private static class InfiniteSource implements ParallelSourceFunction<Integer> {
+				assertThat(clusterClient, is(instanceOf(RestClusterClient.class)));
+				final RestClusterClient<ApplicationId> restClusterClient = (RestClusterClient<ApplicationId>) clusterClient;
 
-		private static final long serialVersionUID = 1642561062000662861L;
-		private volatile boolean running;
-		private final Random random;
+				final CompletableFuture<JobResult> jobResultCompletableFuture = restClusterClient.requestJobResult(jobGraph.getJobID());
 
-		InfiniteSource() {
-			running = true;
-			random = new Random();
-		}
+				final JobResult jobResult = jobResultCompletableFuture.get();
 
-		@Override
-		public void run(SourceContext<Integer> ctx) throws Exception {
-			while (running) {
-				synchronized (ctx.getCheckpointLock()) {
-					ctx.collect(random.nextInt());
+				assertThat(jobResult, is(notNullValue()));
+				assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
+			} finally {
+				if (clusterClient != null) {
+					clusterClient.shutdown();
 				}
 
-				Thread.sleep(5L);
+				if (applicationId != null) {
+					yarnClusterDescriptor.killCluster(applicationId);
+				}
 			}
 		}
+	}
+
+	private static class NoDataSource implements ParallelSourceFunction<Integer> {
+
+		private static final long serialVersionUID = 1642561062000662861L;
 
 		@Override
-		public void cancel() {
-			running = false;
-		}
+		public void run(SourceContext<Integer> ctx) {}
+
+		@Override
+		public void cancel() {}
 	}
 }


[flink] 02/02: [hotfix] Move TestingYarnClusterDescriptor#TestJarFinder into YarnTestUtils

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d018eedf4b8ef49c4943cd2d4c47b53a56640812
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 20 10:02:52 2018 +0200

    [hotfix] Move TestingYarnClusterDescriptor#TestJarFinder into YarnTestUtils
    
    In order to reuse the TestJarFinder class, this commit moves it into the YarnTestUtils
    class.
---
 .../flink/yarn/TestingYarnClusterDescriptor.java   | 22 ++++------------------
 .../java/org/apache/flink/yarn/YARNITCase.java     |  3 ++-
 .../org/apache/flink/yarn/util/YarnTestUtils.java  | 19 +++++++++++++++++++
 3 files changed, 25 insertions(+), 19 deletions(-)

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index 37b8d41..a16cb0b 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -22,12 +22,12 @@ import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.util.YarnTestUtils;
 
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import java.io.File;
-import java.io.FilenameFilter;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -52,15 +52,15 @@ public class TestingYarnClusterDescriptor extends LegacyYarnClusterDescriptor {
 			sharedYarnClient);
 		List<File> filesToShip = new ArrayList<>();
 
-		File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests"));
+		File testingJar = YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn-tests"));
 		Preconditions.checkNotNull(testingJar, "Could not find the flink-yarn-tests tests jar. " +
 			"Make sure to package the flink-yarn-tests module.");
 
-		File testingRuntimeJar = YarnTestBase.findFile("..", new TestJarFinder("flink-runtime"));
+		File testingRuntimeJar = YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-runtime"));
 		Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " +
 			"jar. Make sure to package the flink-runtime module.");
 
-		File testingYarnJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn"));
+		File testingYarnJar = YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn"));
 		Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-yarn tests " +
 			"jar. Make sure to package the flink-yarn module.");
 
@@ -89,18 +89,4 @@ public class TestingYarnClusterDescriptor extends LegacyYarnClusterDescriptor {
 		throw new UnsupportedOperationException("Cannot deploy a per-job cluster yet.");
 	}
 
-	static class TestJarFinder implements FilenameFilter {
-
-		private final String jarName;
-
-		TestJarFinder(final String jarName) {
-			this.jarName = jarName;
-		}
-
-		@Override
-		public boolean accept(File dir, String name) {
-			return name.startsWith(jarName) && name.endsWith("-tests.jar") &&
-				dir.getAbsolutePath().contains(dir.separator + jarName + dir.separator);
-		}
-	}
 }
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 56cfdb6..814e808 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.yarn.util.YarnTestUtils;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -88,7 +89,7 @@ public class YARNITCase extends YarnTestBase {
 
 			final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
-			File testingJar = YarnTestBase.findFile("..", new TestingYarnClusterDescriptor.TestJarFinder("flink-yarn-tests"));
+			File testingJar = YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn-tests"));
 
 			jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
 
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java
index 25d833b..f7a9634 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn.util;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
 
 /**
  * Utility methods for YARN tests.
@@ -33,4 +34,22 @@ public class YarnTestUtils {
 		}
 		return f;
 	}
+
+	/**
+	 * Filename filter which finds the test jar for the given name.
+	 */
+	public static class TestJarFinder implements FilenameFilter {
+
+		private final String jarName;
+
+		public TestJarFinder(final String jarName) {
+			this.jarName = jarName;
+		}
+
+		@Override
+		public boolean accept(File dir, String name) {
+			return name.startsWith(jarName) && name.endsWith("-tests.jar") &&
+				dir.getAbsolutePath().contains(File.separator + jarName + File.separator);
+		}
+	}
 }