You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/03 13:02:30 UTC

[3/3] flink git commit: [FLINK-8900] [yarn] Set correct application status when job is finished

[FLINK-8900] [yarn] Set correct application status when job is finished

This closes #5944


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b267184
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b267184
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b267184

Branch: refs/heads/release-1.5
Commit: 5b26718404a89744fd4ddcdd963a712ec581222c
Parents: 38a4cd8
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Apr 27 18:57:27 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 3 11:16:35 2018 +0200

----------------------------------------------------------------------
 .../entrypoint/MesosJobClusterEntrypoint.java   |  4 ++++
 .../clusterframework/ApplicationStatus.java     | 25 ++++++++++++++++++++
 .../runtime/dispatcher/MiniDispatcher.java      | 23 ++++++++++++++----
 .../runtime/entrypoint/ClusterEntrypoint.java   |  4 +++-
 .../entrypoint/JobClusterEntrypoint.java        | 10 +++++++-
 .../runtime/dispatcher/MiniDispatcherTest.java  | 16 ++++++-------
 .../entrypoint/YarnJobClusterEntrypoint.java    | 13 ++++++++++
 7 files changed, 80 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b267184/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 02f561a..cf661cb 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -25,6 +25,7 @@ import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameter
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -180,6 +181,9 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 			});
 	}
 
+	@Override
+	protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {}
+
 	public static void main(String[] args) {
 		// startup checks and logging
 		EnvironmentInformation.logEnvironmentInfo(LOG, MesosJobClusterEntrypoint.class.getSimpleName(), args);

http://git-wip-us.apache.org/repos/asf/flink/blob/5b267184/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
index b7bb84e..963e9ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.clusterframework;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
 /**
  * The status of an application.
  */
@@ -52,4 +54,27 @@ public enum ApplicationStatus {
 		return processExitCode;
 	}
 
+	/**
+	 * Derives the ApplicationStatus that should be used for a job that resulted in the given
+	 * job status. If the job is not yet in a globally terminal state, this method returns
+	 * {@link #UNKNOWN}.
+	 */
+	public static ApplicationStatus fromJobStatus(JobStatus jobStatus) {
+		if (jobStatus == null) {
+			return UNKNOWN;
+		}
+		else {
+			switch (jobStatus) {
+				case FAILED:
+					return FAILED;
+				case CANCELED:
+					return CANCELED;
+				case FINISHED:
+					return SUCCEEDED;
+
+				default:
+					return UNKNOWN;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b267184/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 3f45824..4361e08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
@@ -40,6 +41,8 @@ import javax.annotation.Nullable;
 
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Mini Dispatcher which is instantiated as the dispatcher component by the {@link JobClusterEntrypoint}.
  *
@@ -52,6 +55,8 @@ public class MiniDispatcher extends Dispatcher {
 
 	private final JobClusterEntrypoint.ExecutionMode executionMode;
 
+	private final CompletableFuture<ApplicationStatus> jobTerminationFuture;
+
 	public MiniDispatcher(
 			RpcService rpcService,
 			String endpointId,
@@ -84,7 +89,12 @@ public class MiniDispatcher extends Dispatcher {
 			fatalErrorHandler,
 			restAddress);
 
-		this.executionMode = executionMode;
+		this.executionMode = checkNotNull(executionMode);
+		this.jobTerminationFuture = new CompletableFuture<>();
+	}
+
+	public CompletableFuture<ApplicationStatus> getJobTerminationFuture() {
+		return jobTerminationFuture;
 	}
 
 	@Override
@@ -109,7 +119,12 @@ public class MiniDispatcher extends Dispatcher {
 
 		if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
 			// terminate the MiniDispatcher once we served the first JobResult successfully
-			jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown());
+			jobResultFuture.thenAccept((JobResult result) -> {
+				ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
+						ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
+
+				jobTerminationFuture.complete(status);
+			});
 		}
 
 		return jobResultFuture;
@@ -121,7 +136,7 @@ public class MiniDispatcher extends Dispatcher {
 
 		if (executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
 			// shut down since we don't have to wait for the execution result retrieval
-			shutDown();
+			jobTerminationFuture.complete(ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
 		}
 	}
 
@@ -130,6 +145,6 @@ public class MiniDispatcher extends Dispatcher {
 		super.jobNotFinished(jobId);
 
 		// shut down since we have done our job
-		shutDown();
+		jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b267184/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 42a3d1a..f7a5858 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -241,6 +241,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 						LOG.info("Could not properly terminate the Dispatcher.", throwable);
 					}
 
+					// This is the general shutdown path. If a separate more specific shutdown was
+					// already triggered, this will do nothing
 					shutDownAndTerminate(
 						SUCCESS_RETURN_CODE,
 						ApplicationStatus.SUCCEEDED,
@@ -578,7 +580,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return terminationFuture;
 	}
 
-	private void shutDownAndTerminate(
+	protected void shutDownAndTerminate(
 		int returnCode,
 		ApplicationStatus applicationStatus,
 		@Nullable String diagnostics,

http://git-wip-us.apache.org/repos/asf/flink/blob/5b267184/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 95d9c74..ea7cbe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.entrypoint;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
@@ -44,6 +45,7 @@ import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /**
@@ -106,7 +108,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 
 		final ExecutionMode executionMode = ExecutionMode.valueOf(executionModeValue);
 
-		return new MiniDispatcher(
+		final MiniDispatcher dispatcher = new MiniDispatcher(
 			rpcService,
 			Dispatcher.DISPATCHER_NAME,
 			configuration,
@@ -122,7 +124,13 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			restAddress,
 			jobGraph,
 			executionMode);
+
+		registerShutdownActions(dispatcher.getJobTerminationFuture());
+
+		return dispatcher;
 	}
 
 	protected abstract JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException;
+
+	protected abstract void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b267184/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 5790b8a..6dfb243 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -65,6 +65,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -178,8 +179,8 @@ public class MiniDispatcherTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that in detached mode, the {@link MiniDispatcher} will terminate after the job
-	 * has completed.
+	 * Tests that in detached mode, the {@link MiniDispatcher} will complete the future that
+	 * signals job termination.
 	 */
 	@Test
 	public void testTerminationAfterJobCompletion() throws Exception {
@@ -197,7 +198,7 @@ public class MiniDispatcherTest extends TestLogger {
 			resultFuture.complete(archivedExecutionGraph);
 
 			// wait until we terminate
-			miniDispatcher.getTerminationFuture().get();
+			miniDispatcher.getJobTerminationFuture().get();
 		} finally {
 			RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout);
 		}
@@ -222,9 +223,7 @@ public class MiniDispatcherTest extends TestLogger {
 
 			resultFuture.complete(archivedExecutionGraph);
 
-			final CompletableFuture<Void> terminationFuture = miniDispatcher.getTerminationFuture();
-
-			assertThat(terminationFuture.isDone(), is(false));
+			assertFalse(miniDispatcher.getTerminationFuture().isDone());
 
 			final DispatcherGateway dispatcherGateway = miniDispatcher.getSelfGateway(DispatcherGateway.class);
 
@@ -233,9 +232,8 @@ public class MiniDispatcherTest extends TestLogger {
 			final JobResult jobResult = jobResultFuture.get();
 
 			assertThat(jobResult.getJobId(), is(jobGraph.getJobID()));
-
-			terminationFuture.get();
-		} finally {
+		}
+		finally {
 			RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b267184/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index ca4cb9c..8cc3579 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -19,6 +19,7 @@
 package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
@@ -51,6 +52,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Entry point for Yarn per-job clusters.
@@ -131,6 +133,17 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 		}
 	}
 
+	@Override
+	protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {
+		terminationFuture.thenAccept((status) ->
+			shutDownAndTerminate(status.processExitCode(), status, null, true));
+	}
+
+	// ------------------------------------------------------------------------
+	//  The executable entry point for the Yarn Application Master Process
+	//  for a single Flink job.
+	// ------------------------------------------------------------------------
+
 	public static void main(String[] args) {
 		// startup checks and logging
 		EnvironmentInformation.logEnvironmentInfo(LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);