You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/10 11:26:21 UTC

[flink] branch master updated: [FLINK-12445][yarn] Cancel application on failure

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7113845  [FLINK-12445][yarn] Cancel application on failure
7113845 is described below

commit 71138456ddf82425572cb411bed4f5ab6a76bb2c
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 8 11:39:27 2019 +0200

    [FLINK-12445][yarn] Cancel application on failure
---
 .../yarn/YARNSessionCapacitySchedulerITCase.java   | 82 +++++++++++-----------
 1 file changed, 42 insertions(+), 40 deletions(-)

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index c238037..057ff5a 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -263,46 +263,48 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 			"Flink JobManager is now running on ",
 			RunTypes.YARN_SESSION);
 
-		final String logs = outContent.toString();
-		final HostAndPort hostAndPort = parseJobManagerHostname(logs);
-		final String host = hostAndPort.getHostText();
-		final int port = hostAndPort.getPort();
-		LOG.info("Extracted hostname:port: {}", host, port);
-
-		submitJob("WindowJoin.jar");
-
-		//
-		// Assert that custom YARN application name "customName" is set
-		//
-		final ApplicationReport applicationReport = getOnlyApplicationReport();
-		assertEquals("customName", applicationReport.getName());
-
-		//
-		// Assert the number of TaskManager slots are set
-		//
-		waitForTaskManagerRegistration(host, port, Duration.ofMillis(30_000));
-		assertNumberOfSlotsPerTask(host, port, 3);
-
-		final Map<String, String> flinkConfig = getFlinkConfig(host, port);
-
-		//
-		// Assert dynamic properties
-		//
-		assertThat(flinkConfig, hasEntry("fancy-configuration-value", "veryFancy"));
-		assertThat(flinkConfig, hasEntry("yarn.maximum-failed-containers", "3"));
-
-		//
-		// FLINK-2213: assert that vcores are set
-		//
-		assertThat(flinkConfig, hasEntry(YarnConfigOptions.VCORES.key(), "2"));
-
-		//
-		// FLINK-1902: check if jobmanager hostname is shown in web interface
-		//
-		assertThat(flinkConfig, hasEntry(JobManagerOptions.ADDRESS.key(), host));
-
-		yarnSessionClusterRunner.sendStop();
-		yarnSessionClusterRunner.join();
+		try {
+			final String logs = outContent.toString();
+			final HostAndPort hostAndPort = parseJobManagerHostname(logs);
+			final String host = hostAndPort.getHostText();
+			final int port = hostAndPort.getPort();
+			LOG.info("Extracted hostname:port: {}", host, port);
+
+			submitJob("WindowJoin.jar");
+
+			//
+			// Assert that custom YARN application name "customName" is set
+			//
+			final ApplicationReport applicationReport = getOnlyApplicationReport();
+			assertEquals("customName", applicationReport.getName());
+
+			//
+			// Assert the number of TaskManager slots are set
+			//
+			waitForTaskManagerRegistration(host, port, Duration.ofMillis(30_000));
+			assertNumberOfSlotsPerTask(host, port, 3);
+
+			final Map<String, String> flinkConfig = getFlinkConfig(host, port);
+
+			//
+			// Assert dynamic properties
+			//
+			assertThat(flinkConfig, hasEntry("fancy-configuration-value", "veryFancy"));
+			assertThat(flinkConfig, hasEntry("yarn.maximum-failed-containers", "3"));
+
+			//
+			// FLINK-2213: assert that vcores are set
+			//
+			assertThat(flinkConfig, hasEntry(YarnConfigOptions.VCORES.key(), "2"));
+
+			//
+			// FLINK-1902: check if jobmanager hostname is shown in web interface
+			//
+			assertThat(flinkConfig, hasEntry(JobManagerOptions.ADDRESS.key(), host));
+		} finally {
+			yarnSessionClusterRunner.sendStop();
+			yarnSessionClusterRunner.join();
+		}
 	}
 
 	private static HostAndPort parseJobManagerHostname(final String logs) {