You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/19 06:44:02 UTC
flink git commit: [FLINK-9815][yarn][tests] Harden tests against slow
job shutdowns
Repository: flink
Updated Branches:
refs/heads/master e022acbde -> 230f81753
[FLINK-9815][yarn][tests] Harden tests against slow job shutdowns
This closes #6352.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/230f8175
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/230f8175
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/230f8175
Branch: refs/heads/master
Commit: 230f81753fb56cea39cdea4c763ad8667baa9dbe
Parents: e022acb
Author: zentol <ch...@apache.org>
Authored: Tue Jul 17 13:29:16 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jul 19 08:43:37 2018 +0200
----------------------------------------------------------------------
.../flink/yarn/YARNHighAvailabilityITCase.java | 12 ++--
.../org/apache/flink/yarn/YarnTestBase.java | 63 +++++++++++++-------
2 files changed, 48 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/230f8175/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 18bcfeb..9a8f503 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -133,7 +133,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
"@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "=" + fsStateHandlePath + "/checkpoints" +
"@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");
- ClusterClient<ApplicationId> yarnCluster = null;
+ ClusterClient<ApplicationId> yarnClusterClient = null;
final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
@@ -147,10 +147,10 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
.createClusterSpecification();
try {
- yarnCluster = flinkYarnClient.deploySessionCluster(clusterSpecification);
+ yarnClusterClient = flinkYarnClient.deploySessionCluster(clusterSpecification);
highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
- yarnCluster.getFlinkConfiguration(),
+ yarnClusterClient.getFlinkConfiguration(),
Executors.directExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
@@ -201,8 +201,10 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
}};
} finally {
- if (yarnCluster != null) {
- yarnCluster.shutdown();
+ if (yarnClusterClient != null) {
+ log.info("Shutting down the Flink Yarn application.");
+ yarnClusterClient.shutDownCluster();
+ yarnClusterClient.shutdown();
}
if (highAvailabilityServices != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/230f8175/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 514a3d5..1a0520f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
@@ -67,6 +68,7 @@ import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintStream;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -79,6 +81,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* This base class allows to use the MiniYARNCluster.
@@ -186,39 +189,55 @@ public abstract class YarnTestBase extends TestLogger {
conf.set("hadoop.security.auth_to_local", "RULE:[1:$1] RULE:[2:$1]");
}
- /**
- * Sleep a bit between the tests (we are re-using the YARN cluster for the tests).
- */
- @After
- public void sleep() {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- Assert.fail("Should not happen");
- }
- }
-
@Before
- public void checkClusterEmpty() throws IOException, YarnException {
+ public void checkClusterEmpty() {
if (yarnClient == null) {
yarnClient = YarnClient.createYarnClient();
yarnClient.init(getYarnConfiguration());
yarnClient.start();
}
- List<ApplicationReport> apps = yarnClient.getApplications();
- for (ApplicationReport app : apps) {
- if (app.getYarnApplicationState() != YarnApplicationState.FINISHED
- && app.getYarnApplicationState() != YarnApplicationState.KILLED
- && app.getYarnApplicationState() != YarnApplicationState.FAILED) {
- Assert.fail("There is at least one application on the cluster is not finished." +
- "App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState());
+ flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration);
+
+ isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType());
+ }
+
+ /**
+ * Sleep a bit between the tests (we are re-using the YARN cluster for the tests).
+ */
+ @After
+ public void sleep() throws IOException, YarnException {
+ Deadline deadline = Deadline.now().plus(Duration.ofSeconds(10));
+
+ boolean isAnyJobRunning = yarnClient.getApplications().stream()
+ .anyMatch(YarnTestBase::isApplicationRunning);
+
+ while (deadline.hasTimeLeft() && isAnyJobRunning) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ Assert.fail("Should not happen");
}
+ isAnyJobRunning = yarnClient.getApplications().stream()
+ .anyMatch(YarnTestBase::isApplicationRunning);
}
- flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration);
+ if (isAnyJobRunning) {
+ final List<String> runningApps = yarnClient.getApplications().stream()
+ .filter(YarnTestBase::isApplicationRunning)
+ .map(app -> "App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState() + '.')
+ .collect(Collectors.toList());
+ if (!runningApps.isEmpty()) {
+ Assert.fail("There is at least one application on the cluster that is not finished." + runningApps);
+ }
+ }
+ }
- isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType());
+ private static boolean isApplicationRunning(ApplicationReport app) {
+ final YarnApplicationState yarnApplicationState = app.getYarnApplicationState();
+ return yarnApplicationState != YarnApplicationState.FINISHED
+ && app.getYarnApplicationState() != YarnApplicationState.KILLED
+ && app.getYarnApplicationState() != YarnApplicationState.FAILED;
}
@Nullable