You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/19 06:44:02 UTC

flink git commit: [FLINK-9815][yarn][tests] Harden tests against slow job shutdowns

Repository: flink
Updated Branches:
  refs/heads/master e022acbde -> 230f81753


[FLINK-9815][yarn][tests] Harden tests against slow job shutdowns

This closes #6352.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/230f8175
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/230f8175
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/230f8175

Branch: refs/heads/master
Commit: 230f81753fb56cea39cdea4c763ad8667baa9dbe
Parents: e022acb
Author: zentol <ch...@apache.org>
Authored: Tue Jul 17 13:29:16 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jul 19 08:43:37 2018 +0200

----------------------------------------------------------------------
 .../flink/yarn/YARNHighAvailabilityITCase.java  | 12 ++--
 .../org/apache/flink/yarn/YarnTestBase.java     | 63 +++++++++++++-------
 2 files changed, 48 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/230f8175/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 18bcfeb..9a8f503 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -133,7 +133,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 			"@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "=" + fsStateHandlePath + "/checkpoints" +
 			"@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");
 
-		ClusterClient<ApplicationId> yarnCluster = null;
+		ClusterClient<ApplicationId> yarnClusterClient = null;
 
 		final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
 
@@ -147,10 +147,10 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 			.createClusterSpecification();
 
 		try {
-			yarnCluster = flinkYarnClient.deploySessionCluster(clusterSpecification);
+			yarnClusterClient = flinkYarnClient.deploySessionCluster(clusterSpecification);
 
 			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-				yarnCluster.getFlinkConfiguration(),
+				yarnClusterClient.getFlinkConfiguration(),
 				Executors.directExecutor(),
 				HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
 
@@ -201,8 +201,10 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 
 			}};
 		} finally {
-			if (yarnCluster != null) {
-				yarnCluster.shutdown();
+			if (yarnClusterClient != null) {
+				log.info("Shutting down the Flink Yarn application.");
+				yarnClusterClient.shutDownCluster();
+				yarnClusterClient.shutdown();
 			}
 
 			if (highAvailabilityServices != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/230f8175/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 514a3d5..1a0520f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
@@ -67,6 +68,7 @@ import java.io.InputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.io.PrintStream;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -79,6 +81,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * This base class allows to use the MiniYARNCluster.
@@ -186,39 +189,55 @@ public abstract class YarnTestBase extends TestLogger {
 		conf.set("hadoop.security.auth_to_local", "RULE:[1:$1] RULE:[2:$1]");
 	}
 
-	/**
-	 * Sleep a bit between the tests (we are re-using the YARN cluster for the tests).
-	 */
-	@After
-	public void sleep() {
-		try {
-			Thread.sleep(500);
-		} catch (InterruptedException e) {
-			Assert.fail("Should not happen");
-		}
-	}
-
 	@Before
-	public void checkClusterEmpty() throws IOException, YarnException {
+	public void checkClusterEmpty() {
 		if (yarnClient == null) {
 			yarnClient = YarnClient.createYarnClient();
 			yarnClient.init(getYarnConfiguration());
 			yarnClient.start();
 		}
 
-		List<ApplicationReport> apps = yarnClient.getApplications();
-		for (ApplicationReport app : apps) {
-			if (app.getYarnApplicationState() != YarnApplicationState.FINISHED
-					&& app.getYarnApplicationState() != YarnApplicationState.KILLED
-					&& app.getYarnApplicationState() != YarnApplicationState.FAILED) {
-				Assert.fail("There is at least one application on the cluster is not finished." +
-						"App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState());
+		flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration);
+
+		isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType());
+	}
+
+	/**
+	 * Sleep a bit between the tests (we are re-using the YARN cluster for the tests).
+	 */
+	@After
+	public void sleep() throws IOException, YarnException {
+		Deadline deadline = Deadline.now().plus(Duration.ofSeconds(10));
+
+		boolean isAnyJobRunning = yarnClient.getApplications().stream()
+			.anyMatch(YarnTestBase::isApplicationRunning);
+
+		while (deadline.hasTimeLeft() && isAnyJobRunning) {
+			try {
+				Thread.sleep(500);
+			} catch (InterruptedException e) {
+				Assert.fail("Should not happen");
 			}
+			isAnyJobRunning = yarnClient.getApplications().stream()
+				.anyMatch(YarnTestBase::isApplicationRunning);
 		}
 
-		flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration);
+		if (isAnyJobRunning) {
+			final List<String> runningApps = yarnClient.getApplications().stream()
+				.filter(YarnTestBase::isApplicationRunning)
+				.map(app -> "App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState() + '.')
+				.collect(Collectors.toList());
+			if (!runningApps.isEmpty()) {
+				Assert.fail("There is at least one application on the cluster that is not finished." + runningApps);
+			}
+		}
+	}
 
-		isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType());
+	private static boolean isApplicationRunning(ApplicationReport app) {
+		final YarnApplicationState yarnApplicationState = app.getYarnApplicationState();
+		return yarnApplicationState != YarnApplicationState.FINISHED
+			&& app.getYarnApplicationState() != YarnApplicationState.KILLED
+			&& app.getYarnApplicationState() != YarnApplicationState.FAILED;
 	}
 
 	@Nullable