You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/03 13:02:30 UTC
[3/3] flink git commit: [FLINK-8900] [yarn] Set correct application
status when job is finished
[FLINK-8900] [yarn] Set correct application status when job is finished
This closes #5944
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b267184
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b267184
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b267184
Branch: refs/heads/release-1.5
Commit: 5b26718404a89744fd4ddcdd963a712ec581222c
Parents: 38a4cd8
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Apr 27 18:57:27 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 3 11:16:35 2018 +0200
----------------------------------------------------------------------
.../entrypoint/MesosJobClusterEntrypoint.java | 4 ++++
.../clusterframework/ApplicationStatus.java | 25 ++++++++++++++++++++
.../runtime/dispatcher/MiniDispatcher.java | 23 ++++++++++++++----
.../runtime/entrypoint/ClusterEntrypoint.java | 4 +++-
.../entrypoint/JobClusterEntrypoint.java | 10 +++++++-
.../runtime/dispatcher/MiniDispatcherTest.java | 16 ++++++-------
.../entrypoint/YarnJobClusterEntrypoint.java | 13 ++++++++++
7 files changed, 80 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5b267184/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 02f561a..cf661cb 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -25,6 +25,7 @@ import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameter
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -180,6 +181,9 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
});
}
+ @Override
+ protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {}
+
public static void main(String[] args) {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, MesosJobClusterEntrypoint.class.getSimpleName(), args);
http://git-wip-us.apache.org/repos/asf/flink/blob/5b267184/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
index b7bb84e..963e9ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.clusterframework;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
/**
* The status of an application.
*/
@@ -52,4 +54,27 @@ public enum ApplicationStatus {
return processExitCode;
}
+ /**
+ * Derives the ApplicationStatus that should be used for a job that resulted in the given
+ * job status. If the job is not yet in a globally terminal state, this method returns
+ * {@link #UNKNOWN}.
+ */
+ public static ApplicationStatus fromJobStatus(JobStatus jobStatus) {
+ if (jobStatus == null) {
+ return UNKNOWN;
+ }
+ else {
+ switch (jobStatus) {
+ case FAILED:
+ return FAILED;
+ case CANCELED:
+ return CANCELED;
+ case FINISHED:
+ return SUCCEEDED;
+
+ default:
+ return UNKNOWN;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b267184/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 3f45824..4361e08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
@@ -40,6 +41,8 @@ import javax.annotation.Nullable;
import java.util.concurrent.CompletableFuture;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Mini Dispatcher which is instantiated as the dispatcher component by the {@link JobClusterEntrypoint}.
*
@@ -52,6 +55,8 @@ public class MiniDispatcher extends Dispatcher {
private final JobClusterEntrypoint.ExecutionMode executionMode;
+ private final CompletableFuture<ApplicationStatus> jobTerminationFuture;
+
public MiniDispatcher(
RpcService rpcService,
String endpointId,
@@ -84,7 +89,12 @@ public class MiniDispatcher extends Dispatcher {
fatalErrorHandler,
restAddress);
- this.executionMode = executionMode;
+ this.executionMode = checkNotNull(executionMode);
+ this.jobTerminationFuture = new CompletableFuture<>();
+ }
+
+ public CompletableFuture<ApplicationStatus> getJobTerminationFuture() {
+ return jobTerminationFuture;
}
@Override
@@ -109,7 +119,12 @@ public class MiniDispatcher extends Dispatcher {
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the first JobResult successfully
- jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown());
+ jobResultFuture.thenAccept((JobResult result) -> {
+ ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
+ ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
+
+ jobTerminationFuture.complete(status);
+ });
}
return jobResultFuture;
@@ -121,7 +136,7 @@ public class MiniDispatcher extends Dispatcher {
if (executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
// shut down since we don't have to wait for the execution result retrieval
- shutDown();
+ jobTerminationFuture.complete(ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
}
}
@@ -130,6 +145,6 @@ public class MiniDispatcher extends Dispatcher {
super.jobNotFinished(jobId);
// shut down since we have done our job
- shutDown();
+ jobTerminationFuture.complete(ApplicationStatus.UNKNOWN);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b267184/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 42a3d1a..f7a5858 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -241,6 +241,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
LOG.info("Could not properly terminate the Dispatcher.", throwable);
}
+ // This is the general shutdown path. If a separate more specific shutdown was
+ // already triggered, this will do nothing
shutDownAndTerminate(
SUCCESS_RETURN_CODE,
ApplicationStatus.SUCCEEDED,
@@ -578,7 +580,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
return terminationFuture;
}
- private void shutDownAndTerminate(
+ protected void shutDownAndTerminate(
int returnCode,
ApplicationStatus applicationStatus,
@Nullable String diagnostics,
http://git-wip-us.apache.org/repos/asf/flink/blob/5b267184/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 95d9c74..ea7cbe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.entrypoint;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.Dispatcher;
@@ -44,6 +45,7 @@ import org.apache.flink.util.FlinkException;
import javax.annotation.Nullable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/**
@@ -106,7 +108,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
final ExecutionMode executionMode = ExecutionMode.valueOf(executionModeValue);
- return new MiniDispatcher(
+ final MiniDispatcher dispatcher = new MiniDispatcher(
rpcService,
Dispatcher.DISPATCHER_NAME,
configuration,
@@ -122,7 +124,13 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
restAddress,
jobGraph,
executionMode);
+
+ registerShutdownActions(dispatcher.getJobTerminationFuture());
+
+ return dispatcher;
}
protected abstract JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException;
+
+ protected abstract void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b267184/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 5790b8a..6dfb243 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -65,6 +65,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -178,8 +179,8 @@ public class MiniDispatcherTest extends TestLogger {
}
/**
- * Tests that in detached mode, the {@link MiniDispatcher} will terminate after the job
- * has completed.
+ * Tests that in detached mode, the {@link MiniDispatcher} will complete the future that
+ * signals job termination.
*/
@Test
public void testTerminationAfterJobCompletion() throws Exception {
@@ -197,7 +198,7 @@ public class MiniDispatcherTest extends TestLogger {
resultFuture.complete(archivedExecutionGraph);
// wait until we terminate
- miniDispatcher.getTerminationFuture().get();
+ miniDispatcher.getJobTerminationFuture().get();
} finally {
RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout);
}
@@ -222,9 +223,7 @@ public class MiniDispatcherTest extends TestLogger {
resultFuture.complete(archivedExecutionGraph);
- final CompletableFuture<Void> terminationFuture = miniDispatcher.getTerminationFuture();
-
- assertThat(terminationFuture.isDone(), is(false));
+ assertFalse(miniDispatcher.getTerminationFuture().isDone());
final DispatcherGateway dispatcherGateway = miniDispatcher.getSelfGateway(DispatcherGateway.class);
@@ -233,9 +232,8 @@ public class MiniDispatcherTest extends TestLogger {
final JobResult jobResult = jobResultFuture.get();
assertThat(jobResult.getJobId(), is(jobGraph.getJobID()));
-
- terminationFuture.get();
- } finally {
+ }
+ finally {
RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b267184/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index ca4cb9c..8cc3579 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -19,6 +19,7 @@
package org.apache.flink.yarn.entrypoint;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
@@ -51,6 +52,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
/**
* Entry point for Yarn per-job clusters.
@@ -131,6 +133,17 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
}
}
+ @Override
+ protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {
+ terminationFuture.thenAccept((status) ->
+ shutDownAndTerminate(status.processExitCode(), status, null, true));
+ }
+
+ // ------------------------------------------------------------------------
+ // The executable entry point for the Yarn Application Master Process
+ // for a single Flink job.
+ // ------------------------------------------------------------------------
+
public static void main(String[] args) {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);