You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 14:35:04 UTC

[flink] 05/08: [hotfix] Let ClusterEntrypoint implement AutoCloseableAsync

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 20447b23848c2f69afef2742e06922fe174ddc47
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 27 17:14:00 2018 +0200

    [hotfix] Let ClusterEntrypoint implement AutoCloseableAsync
---
 .../apache/flink/runtime/entrypoint/ClusterEntrypoint.java  | 13 +++++++++++--
 .../AbstractTaskManagerProcessFailureRecoveryTest.java      |  7 +------
 2 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 5665500..9eaef34 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
@@ -88,7 +89,7 @@ import scala.concurrent.duration.FiniteDuration;
  *
  * <p>Specialization of this class can be used for the session mode and the per-job mode
  */
-public abstract class ClusterEntrypoint implements FatalErrorHandler {
+public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErrorHandler {
 
 	public static final ConfigOption<String> EXECUTION_MODE = ConfigOptions
 		.key("internal.cluster.execution-mode")
@@ -312,6 +313,14 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 	}
 
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		return shutDownAsync(
+			ApplicationStatus.UNKNOWN,
+			"Cluster entrypoint has been closed externally.",
+			true).thenAccept(ignored -> {});
+	}
+
 	protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
 		synchronized (lock) {
 			Throwable exception = null;
@@ -392,7 +401,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return resultConfiguration;
 	}
 
-	public CompletableFuture<ApplicationStatus> shutDownAsync(
+	private CompletableFuture<ApplicationStatus> shutDownAsync(
 			ApplicationStatus applicationStatus,
 			@Nullable String diagnostics,
 			boolean cleanupHaData) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 1374b70..5d7f26b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
@@ -101,9 +100,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 		jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L);
 		jmConfig.setInteger(RestOptions.PORT, restPort);
 
-		final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig);
-
-		try {
+		try (final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig)) {
 			// check that we run this test only if the java command
 			// is available on this machine
 			String javaCommand = getJavaCommandPath();
@@ -228,8 +225,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			if (taskManagerProcess3 != null) {
 				taskManagerProcess3.destroy();
 			}
-
-			clusterEntrypoint.shutDownAsync(ApplicationStatus.SUCCEEDED, null, true).get();
 		}
 	}