You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 14:35:04 UTC
[flink] 05/08: [hotfix] Let ClusterEntrypoint implement
AutoCloseableAsync
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 20447b23848c2f69afef2742e06922fe174ddc47
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 27 17:14:00 2018 +0200
[hotfix] Let ClusterEntrypoint implement AutoCloseableAsync
---
.../apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 13 +++++++++++--
.../AbstractTaskManagerProcessFailureRecoveryTest.java | 7 +------
2 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 5665500..9eaef34 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
@@ -88,7 +89,7 @@ import scala.concurrent.duration.FiniteDuration;
*
* <p>Specialization of this class can be used for the session mode and the per-job mode
*/
-public abstract class ClusterEntrypoint implements FatalErrorHandler {
+public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErrorHandler {
public static final ConfigOption<String> EXECUTION_MODE = ConfigOptions
.key("internal.cluster.execution-mode")
@@ -312,6 +313,14 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
}
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return shutDownAsync(
+ ApplicationStatus.UNKNOWN,
+ "Cluster entrypoint has been closed externally.",
+ true).thenAccept(ignored -> {});
+ }
+
protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
synchronized (lock) {
Throwable exception = null;
@@ -392,7 +401,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
return resultConfiguration;
}
- public CompletableFuture<ApplicationStatus> shutDownAsync(
+ private CompletableFuture<ApplicationStatus> shutDownAsync(
ApplicationStatus applicationStatus,
@Nullable String diagnostics,
boolean cleanupHaData) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 1374b70..5d7f26b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
@@ -101,9 +100,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L);
jmConfig.setInteger(RestOptions.PORT, restPort);
- final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig);
-
- try {
+ try (final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig)) {
// check that we run this test only if the java command
// is available on this machine
String javaCommand = getJavaCommandPath();
@@ -228,8 +225,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
if (taskManagerProcess3 != null) {
taskManagerProcess3.destroy();
}
-
- clusterEntrypoint.shutDownAsync(ApplicationStatus.SUCCEEDED, null, true).get();
}
}