You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/30 11:52:18 UTC
[flink] 01/02: [FLINK-12038][tests] Harden YARNITCase
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 420f6cac29bf795f524a40bca60adb46edfa6e3a
Author: shuai-xu <sh...@foxmail.com>
AuthorDate: Fri Jul 19 12:36:45 2019 +0800
[FLINK-12038][tests] Harden YARNITCase
Only kill Yarn application if it does not properly terminate.
This closes #9175.
---
.../java/org/apache/flink/yarn/YARNITCase.java | 37 +++++++++++++++++++---
1 file changed, 33 insertions(+), 4 deletions(-)
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 470c04d..81a299b 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
@@ -32,11 +33,14 @@ import org.apache.flink.yarn.util.YarnTestUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
+import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
@@ -50,6 +54,10 @@ import static org.junit.Assert.assertThat;
*/
public class YARNITCase extends YarnTestBase {
+ private final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);
+
+ private final int sleepIntervalInMS = 100;
+
@BeforeClass
public static void setup() {
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-per-job");
@@ -113,16 +121,37 @@ public class YARNITCase extends YarnTestBase {
assertThat(jobResult, is(notNullValue()));
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
+
+ waitApplicationFinishedElseKillIt(applicationId, yarnAppTerminateTimeout, yarnClusterDescriptor);
} finally {
if (clusterClient != null) {
clusterClient.shutdown();
}
-
- if (applicationId != null) {
- yarnClusterDescriptor.killCluster(applicationId);
- }
}
}
});
}
+
+ private void waitApplicationFinishedElseKillIt(
+ ApplicationId applicationId,
+ Duration timeout,
+ YarnClusterDescriptor yarnClusterDescriptor) throws Exception {
+ Deadline deadline = Deadline.now().plus(timeout);
+ YarnApplicationState state = getYarnClient().getApplicationReport(applicationId).getYarnApplicationState();
+
+ while (state != YarnApplicationState.FINISHED) {
+ if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
+ Assert.fail("Application became FAILED or KILLED while expecting FINISHED");
+ }
+
+ if (deadline.isOverdue()) {
+ yarnClusterDescriptor.killCluster(applicationId);
+ Assert.fail("Application didn't finish before timeout");
+ }
+
+ sleep(sleepIntervalInMS);
+ state = getYarnClient().getApplicationReport(applicationId).getYarnApplicationState();
+ }
+ }
+
}