You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/18 09:13:25 UTC

flink git commit: [hotfix] Properly stop Flink Yarn application in YARNSessionFIFOITCase#testJavaAPI

Repository: flink
Updated Branches:
  refs/heads/master 28cc0720c -> d6c069d2c


[hotfix] Properly stop Flink Yarn application in YARNSessionFIFOITCase#testJavaAPI

Calls ClusterClient#shutDownCluster in order to stop the Flink Yarn application


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d6c069d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d6c069d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d6c069d2

Branch: refs/heads/master
Commit: d6c069d2c6e933ce1f4e112972eec7cbec82c8fa
Parents: 28cc072
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jul 18 11:11:25 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 18 11:13:11 2018 +0200

----------------------------------------------------------------------
 .../flink/yarn/YARNSessionFIFOITCase.java       | 58 ++++++++++----------
 1 file changed, 30 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d6c069d2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index bb479ae..f027399 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -323,38 +323,40 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 				.setSlotsPerTaskManager(1)
 				.createClusterSpecification();
 			// deploy
-			ClusterClient<ApplicationId> yarnCluster = null;
+			ClusterClient<ApplicationId> yarnClusterClient = null;
 			try {
-				yarnCluster = clusterDescriptor.deploySessionCluster(clusterSpecification);
-			} catch (Exception e) {
-				LOG.warn("Failing test", e);
-				Assert.fail("Error while deploying YARN cluster: " + e.getMessage());
-			}
-			GetClusterStatusResponse expectedStatus = new GetClusterStatusResponse(1, 1);
-			for (int second = 0; second < waitTime * 2; second++) { // run "forever"
-				try {
-					Thread.sleep(1000);
-				} catch (InterruptedException e) {
-					LOG.warn("Interrupted", e);
-				}
-				GetClusterStatusResponse status = yarnCluster.getClusterStatus();
-				if (status != null && status.equals(expectedStatus)) {
-					LOG.info("ClusterClient reached status " + status);
-					break; // all good, cluster started
+				yarnClusterClient = clusterDescriptor.deploySessionCluster(clusterSpecification);
+
+				GetClusterStatusResponse expectedStatus = new GetClusterStatusResponse(1, 1);
+				for (int second = 0; second < waitTime * 2; second++) { // run "forever"
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e) {
+						LOG.warn("Interrupted", e);
+					}
+					GetClusterStatusResponse status = yarnClusterClient.getClusterStatus();
+					if (status != null && status.equals(expectedStatus)) {
+						LOG.info("ClusterClient reached status " + status);
+						break; // all good, cluster started
+					}
+					if (second > waitTime) {
+						// we waited for 15 seconds. cluster didn't come up correctly
+						Assert.fail("The custer didn't start after " + waitTime + " seconds");
+					}
 				}
-				if (second > waitTime) {
-					// we waited for 15 seconds. cluster didn't come up correctly
-					Assert.fail("The custer didn't start after " + waitTime + " seconds");
+
+				// use the cluster
+				Assert.assertNotNull(yarnClusterClient.getClusterConnectionInfo());
+				Assert.assertNotNull(yarnClusterClient.getWebInterfaceURL());
+				LOG.info("All tests passed.");
+			} finally {
+				if (yarnClusterClient != null) {
+					// shutdown cluster
+					LOG.info("Shutting down the Flink Yarn application.");
+					yarnClusterClient.shutDownCluster();
+					yarnClusterClient.shutdown();
 				}
 			}
-
-			// use the cluster
-			Assert.assertNotNull(yarnCluster.getClusterConnectionInfo());
-			Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
-
-			LOG.info("Shutting down cluster. All tests passed");
-			// shutdown cluster
-			yarnCluster.shutdown();
 		}
 		LOG.info("Finished testJavaAPI()");
 	}