You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:43:12 UTC

[flink] 02/04: [FLINK-10411] Move System.exit out of ClusterEntrypoint

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b9ebf421c8979d99f2ae960075b76796ba0b6ac4
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Sep 24 09:27:34 2018 +0200

    [FLINK-10411] Move System.exit out of ClusterEntrypoint
    
    Move the logic of when to exit the JVM process out of the ClusterEntrypoint
    so that the caller is now responsible to make this call. This improves the
    usage of the ClusterEntrypoint for testing purposes.
---
 .../entrypoint/StandaloneJobClusterEntryPoint.java |   4 +-
 .../entrypoint/MesosJobClusterEntrypoint.java      |   5 +-
 .../entrypoint/MesosSessionClusterEntrypoint.java  |   5 +-
 .../runtime/entrypoint/ClusterEntrypoint.java      | 128 +++++++++++----------
 .../entrypoint/ClusterEntrypointException.java     |  40 +++++++
 .../StandaloneSessionClusterEntrypoint.java        |   4 +-
 .../yarn/entrypoint/YarnJobClusterEntrypoint.java  |   5 +-
 .../entrypoint/YarnSessionClusterEntrypoint.java   |   7 +-
 8 files changed, 125 insertions(+), 73 deletions(-)

diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index cdd44b5..b81d992 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -61,7 +61,7 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
 		return new JobClusterComponent(
 			StandaloneResourceManagerFactory.INSTANCE,
 			new ClassPathJobGraphRetriever(jobClassName, savepointRestoreSettings, programArguments));
@@ -94,6 +94,6 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 			clusterConfiguration.getSavepointRestoreSettings(),
 			clusterConfiguration.getArgs());
 
-		entrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(entrypoint);
 	}
 }
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index ed2175a..922d5cd 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.entrypoint.ClusterComponent;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
 import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
@@ -108,7 +109,7 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
 		return new JobClusterComponent(
 			new MesosResourceManagerFactory(
 				mesosServices,
@@ -141,6 +142,6 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 
 		MesosJobClusterEntrypoint clusterEntrypoint = new MesosJobClusterEntrypoint(configuration, dynamicProperties);
 
-		clusterEntrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint);
 	}
 }
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index 0cc3053..f691940 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.entrypoint.ClusterComponent;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.SessionClusterComponent;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -107,7 +108,7 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 	}
 
 	@Override
-	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
 		return new SessionClusterComponent(new MesosResourceManagerFactory(
 			mesosServices,
 			mesosConfig,
@@ -138,7 +139,7 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 
 		MesosSessionClusterEntrypoint clusterEntrypoint = new MesosSessionClusterEntrypoint(configuration, dynamicProperties);
 
-		clusterEntrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint);
 	}
 
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 39e1265..54ccaec 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -73,7 +73,10 @@ import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -94,19 +97,19 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
 	protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
 
+	private static final Time INITIALIZATION_SHUTDOWN_TIMEOUT = Time.seconds(30L);
+
 	/** The lock to guard startup / shutdown / manipulation methods. */
 	private final Object lock = new Object();
 
 	private final Configuration configuration;
 
-	private final CompletableFuture<Void> terminationFuture;
-
-	private final AtomicBoolean isTerminating = new AtomicBoolean(false);
+	private final CompletableFuture<ApplicationStatus> terminationFuture;
 
 	private final AtomicBoolean isShutDown = new AtomicBoolean(false);
 
 	@GuardedBy("lock")
-	private ClusterComponent<?> dispatcherComponent;
+	private ClusterComponent<?> clusterComponent;
 
 	@GuardedBy("lock")
 	private MetricRegistryImpl metricRegistry;
@@ -138,11 +141,11 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		shutDownHook = ShutdownHookUtil.addShutdownHook(this::cleanupDirectories, getClass().getSimpleName(), LOG);
 	}
 
-	public CompletableFuture<Void> getTerminationFuture() {
+	public CompletableFuture<ApplicationStatus> getTerminationFuture() {
 		return terminationFuture;
 	}
 
-	protected void startCluster() {
+	protected void startCluster() throws ClusterEntrypointException {
 		LOG.info("Starting {}.", getClass().getSimpleName());
 
 		try {
@@ -157,17 +160,24 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			});
 		} catch (Throwable t) {
 			final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
-			LOG.error("Cluster initialization failed.", strippedThrowable);
 
-			shutDownAndTerminate(
-				STARTUP_FAILURE_RETURN_CODE,
-				ApplicationStatus.FAILED,
-				strippedThrowable.getMessage(),
-				false);
+			try {
+				// clean up any partial state
+				shutDownAsync(
+					ApplicationStatus.FAILED,
+					ExceptionUtils.stringifyException(strippedThrowable),
+					false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+			} catch (InterruptedException | ExecutionException | TimeoutException e) {
+				strippedThrowable.addSuppressed(e);
+			}
+
+			throw new ClusterEntrypointException(
+				String.format("Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()),
+				strippedThrowable);
 		}
 	}
 
-	protected void configureFileSystems(Configuration configuration) throws Exception {
+	private void configureFileSystems(Configuration configuration) throws Exception {
 		LOG.info("Install default filesystem.");
 
 		try {
@@ -194,9 +204,9 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
 			configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
 
-			dispatcherComponent = createDispatcherComponent(configuration);
+			clusterComponent = createClusterComponent(configuration);
 
-			dispatcherComponent.startComponent(
+			clusterComponent.startComponent(
 				configuration,
 				commonRpcService,
 				haServices,
@@ -206,19 +216,17 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 				archivedExecutionGraphStore,
 				this);
 
-			dispatcherComponent.getShutDownFuture().whenComplete(
+			clusterComponent.getShutDownFuture().whenComplete(
 				(ApplicationStatus applicationStatus, Throwable throwable) -> {
 					if (throwable != null) {
-						shutDownAndTerminate(
-							RUNTIME_FAILURE_RETURN_CODE,
+						shutDownAsync(
 							ApplicationStatus.UNKNOWN,
 							ExceptionUtils.stringifyException(throwable),
 							false);
 					} else {
 						// This is the general shutdown path. If a separate more specific shutdown was
 						// already triggered, this will do nothing
-						shutDownAndTerminate(
-							applicationStatus.processExitCode(),
+						shutDownAsync(
 							applicationStatus,
 							null,
 							true);
@@ -382,12 +390,15 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return resultConfiguration;
 	}
 
-	private CompletableFuture<Void> shutDownAsync(
-			boolean cleanupHaData,
+	private CompletableFuture<ApplicationStatus> shutDownAsync(
 			ApplicationStatus applicationStatus,
-			@Nullable String diagnostics) {
+			@Nullable String diagnostics,
+			boolean cleanupHaData) {
 		if (isShutDown.compareAndSet(false, true)) {
-			LOG.info("Stopping {}.", getClass().getSimpleName());
+			LOG.info("Shutting {} down with application status {}. Diagnostics {}.",
+				getClass().getSimpleName(),
+				applicationStatus,
+				diagnostics);
 
 			final CompletableFuture<Void> shutDownApplicationFuture = closeClusterComponent(applicationStatus, diagnostics);
 
@@ -404,7 +415,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 					if (serviceThrowable != null) {
 						terminationFuture.completeExceptionally(serviceThrowable);
 					} else {
-						terminationFuture.complete(null);
+						terminationFuture.complete(applicationStatus);
 					}
 				});
 		}
@@ -412,37 +423,6 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return terminationFuture;
 	}
 
-	protected void shutDownAndTerminate(
-		int returnCode,
-		ApplicationStatus applicationStatus,
-		@Nullable String diagnostics,
-		boolean cleanupHaData) {
-
-		if (isTerminating.compareAndSet(false, true)) {
-			LOG.info("Shut down and terminate {} with return code {} and application status {}. Diagnostics {}.",
-				getClass().getSimpleName(),
-				returnCode,
-				applicationStatus,
-				diagnostics);
-
-			shutDownAsync(
-				cleanupHaData,
-				applicationStatus,
-				diagnostics).whenComplete(
-				(Void ignored, Throwable t) -> {
-					if (t != null) {
-						LOG.info("Could not properly shut down cluster entrypoint.", t);
-					}
-
-					System.exit(returnCode);
-				});
-		} else {
-			LOG.debug("Concurrent termination call detected. Ignoring termination call with return code {} and application status {}.",
-				returnCode,
-				applicationStatus);
-		}
-	}
-
 	/**
 	 * Deregister the Flink application from the resource management system by signalling
 	 * the {@link ResourceManager}.
@@ -453,10 +433,10 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	 */
 	private CompletableFuture<Void> closeClusterComponent(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
 		synchronized (lock) {
-			if (dispatcherComponent != null) {
-				final CompletableFuture<Void> deregisterApplicationFuture = dispatcherComponent.deregisterApplication(applicationStatus, diagnostics);
+			if (clusterComponent != null) {
+				final CompletableFuture<Void> deregisterApplicationFuture = clusterComponent.deregisterApplication(applicationStatus, diagnostics);
 
-				return FutureUtils.runAfterwards(deregisterApplicationFuture, dispatcherComponent::closeAsync);
+				return FutureUtils.runAfterwards(deregisterApplicationFuture, clusterComponent::closeAsync);
 			} else {
 				return CompletableFuture.completedFuture(null);
 			}
@@ -480,7 +460,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	// Abstract methods
 	// --------------------------------------------------
 
-	protected abstract ClusterComponent<?> createDispatcherComponent(Configuration configuration);
+	protected abstract ClusterComponent<?> createClusterComponent(Configuration configuration);
 
 	protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
 		Configuration configuration,
@@ -511,6 +491,34 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return configuration;
 	}
 
+	// --------------------------------------------------
+	// Helper methods
+	// --------------------------------------------------
+
+	public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
+
+		final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
+		try {
+			clusterEntrypoint.startCluster();
+		} catch (ClusterEntrypointException e) {
+			LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e);
+			System.exit(STARTUP_FAILURE_RETURN_CODE);
+		}
+
+		clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, throwable) -> {
+			final int returnCode;
+
+			if (throwable != null) {
+				returnCode = RUNTIME_FAILURE_RETURN_CODE;
+			} else {
+				returnCode = applicationStatus.processExitCode();
+			}
+
+			LOG.info("Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable);
+			System.exit(returnCode);
+		});
+	}
+
 	/**
 	 * Execution mode of the {@link MiniDispatcher}.
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointException.java
new file mode 100644
index 0000000..21d37ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exceptions thrown by the {@link ClusterEntrypoint}.
+ */
+public class ClusterEntrypointException extends FlinkException {
+	private static final long serialVersionUID = -3855286807063809945L;
+
+	public ClusterEntrypointException(String message) {
+		super(message);
+	}
+
+	public ClusterEntrypointException(Throwable cause) {
+		super(cause);
+	}
+
+	public ClusterEntrypointException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index e92248c..1675235 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -35,7 +35,7 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint
 	}
 
 	@Override
-	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
 		return new SessionClusterComponent(StandaloneResourceManagerFactory.INSTANCE);
 	}
 
@@ -60,6 +60,6 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint
 
 		StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);
 
-		entrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(entrypoint);
 	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index a52975a..1733f49 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.entrypoint.ClusterComponent;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
 import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
@@ -61,7 +62,7 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
 		return new JobClusterComponent(
 			YarnResourceManagerFactory.INSTANCE,
 			FileJobGraphRetriever.createFrom(configuration));
@@ -98,6 +99,6 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 			configuration,
 			workingDirectory);
 
-		yarnJobClusterEntrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
 	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index 116e2ff..e0bebfd 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -20,8 +20,9 @@ package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.entrypoint.ClusterComponent;
-import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.SessionClusterComponent;
+import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -59,7 +60,7 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
 	}
 
 	@Override
-	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
 		return new SessionClusterComponent(YarnResourceManagerFactory.INSTANCE);
 	}
 
@@ -89,6 +90,6 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
 			configuration,
 			workingDirectory);
 
-		yarnSessionClusterEntrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(yarnSessionClusterEntrypoint);
 	}
 }