You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/30 11:52:18 UTC

[flink] 01/02: [FLINK-12038][tests] Harden YARNITCase

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 420f6cac29bf795f524a40bca60adb46edfa6e3a
Author: shuai-xu <sh...@foxmail.com>
AuthorDate: Fri Jul 19 12:36:45 2019 +0800

    [FLINK-12038][tests] Harden YARNITCase
    
    Only kill Yarn application if it does not properly terminate.
    
    This closes #9175.
---
 .../java/org/apache/flink/yarn/YARNITCase.java     | 37 +++++++++++++++++++---
 1 file changed, 33 insertions(+), 4 deletions(-)

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 470c04d..81a299b 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
@@ -32,11 +33,14 @@ import org.apache.flink.yarn.util.YarnTestUtils;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 
@@ -50,6 +54,10 @@ import static org.junit.Assert.assertThat;
  */
 public class YARNITCase extends YarnTestBase {
 
+	private final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);
+
+	private final int sleepIntervalInMS = 100;
+
 	@BeforeClass
 	public static void setup() {
 		YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-per-job");
@@ -113,16 +121,37 @@ public class YARNITCase extends YarnTestBase {
 
 					assertThat(jobResult, is(notNullValue()));
 					assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
+
+					waitApplicationFinishedElseKillIt(applicationId, yarnAppTerminateTimeout, yarnClusterDescriptor);
 				} finally {
 					if (clusterClient != null) {
 						clusterClient.shutdown();
 					}
-
-					if (applicationId != null) {
-						yarnClusterDescriptor.killCluster(applicationId);
-					}
 				}
 			}
 		});
 	}
+
+	private void waitApplicationFinishedElseKillIt(
+			ApplicationId applicationId,
+			Duration timeout,
+			YarnClusterDescriptor yarnClusterDescriptor) throws Exception {
+		Deadline deadline = Deadline.now().plus(timeout);
+		YarnApplicationState state = getYarnClient().getApplicationReport(applicationId).getYarnApplicationState();
+
+		while (state != YarnApplicationState.FINISHED) {
+			if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
+				Assert.fail("Application became FAILED or KILLED while expecting FINISHED");
+			}
+
+			if (deadline.isOverdue()) {
+				yarnClusterDescriptor.killCluster(applicationId);
+				Assert.fail("Application didn't finish before timeout");
+			}
+
+			sleep(sleepIntervalInMS);
+			state = getYarnClient().getApplicationReport(applicationId).getYarnApplicationState();
+		}
+	}
+
 }