You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/06/17 00:58:58 UTC
[6/8] flink git commit: [FLINK-2226][YARN] fail application on failed
single-job cluster job
[FLINK-2226][YARN] fail application on failed single-job cluster job
Failing jobs executed in the YARN cluster mode leave the application
container in the "SUCCEEDED" final state. While for long-running Flink
YARN clusters where multiple jobs are run, this is fine, for single jobs
it is appropriate to mark the application as failed.
This closes #838.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44b969e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44b969e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44b969e0
Branch: refs/heads/release-0.9
Commit: 44b969e0a58347f0262bfc5bc9626362078b373b
Parents: 57810b5
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Jun 15 16:37:35 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Jun 17 00:51:16 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 8 ++---
.../flink/client/FlinkYarnSessionCli.java | 6 ++--
.../org/apache/flink/client/program/Client.java | 1 +
.../flink/runtime/jobgraph/JobStatus.java | 2 +-
.../runtime/yarn/AbstractFlinkYarnCluster.java | 2 +-
.../flink/yarn/YARNSessionFIFOITCase.java | 2 +-
.../org/apache/flink/yarn/FlinkYarnClient.java | 4 +--
.../org/apache/flink/yarn/FlinkYarnCluster.java | 32 +++++++++++++-------
.../apache/flink/yarn/ApplicationClient.scala | 2 +-
.../flink/yarn/ApplicationMasterActor.scala | 10 ++++--
10 files changed, 43 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 3e61a3b..e1bacde 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -265,6 +265,7 @@ public class CliFrontend {
return handleError(t);
}
+ int exitCode = 1;
try {
int userParallelism = options.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
@@ -276,15 +277,14 @@ public class CliFrontend {
"To use another parallelism, set it at the ./bin/flink client.");
userParallelism = client.getMaxSlots();
}
- int exitCode = 0;
// check if detached per job yarn cluster is used to start flink
if(yarnCluster != null && yarnCluster.isDetached()) {
logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
- "yarn application -kill "+yarnCluster.getApplicationId()+"\n" +
+ "yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
- executeProgram(program, client, userParallelism, false);
+ exitCode = executeProgram(program, client, userParallelism, false);
} else {
// regular (blocking) execution.
exitCode = executeProgram(program, client, userParallelism, true);
@@ -314,7 +314,7 @@ public class CliFrontend {
finally {
if (yarnCluster != null && !yarnCluster.isDetached()) {
logAndSysout("Shutting down YARN cluster");
- yarnCluster.shutdown();
+ yarnCluster.shutdown(exitCode != 0);
}
if (program != null) {
program.deleteExtractedLibraries();
http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index 0fa7173..c11edc7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -302,7 +302,7 @@ public class FlinkYarnSessionCli {
if (yarnCluster.hasFailed()) {
System.err.println("The YARN cluster has failed");
- yarnCluster.shutdown();
+ yarnCluster.shutdown(true);
}
// wait until CLIENT_POLLING_INTERVALL is over or the user entered something.
@@ -439,7 +439,7 @@ public class FlinkYarnSessionCli {
if (!yarnCluster.hasBeenStopped()) {
LOG.info("Command Line Interface requested session shutdown");
- yarnCluster.shutdown();
+ yarnCluster.shutdown(false);
}
try {
@@ -458,7 +458,7 @@ public class FlinkYarnSessionCli {
public void stop() {
if (yarnCluster != null) {
LOG.info("Command line interface is shutting down the yarnCluster");
- yarnCluster.shutdown();
+ yarnCluster.shutdown(false);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index e219a38..c544e8d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -318,6 +318,7 @@ public class Client {
ContextEnvironment.enableLocalExecution(true);
}
+ // Job id has been set in the Client passed to the ContextEnvironment
return new JobSubmissionResult(lastJobId);
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 667a68e..eb7d017 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -51,7 +51,7 @@ public enum JobStatus {
private final boolean terminalState;
- private JobStatus(boolean terminalState) {
+ JobStatus(boolean terminalState) {
this.terminalState = terminalState;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
index 398709e..c2e897f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
@@ -30,7 +30,7 @@ public abstract class AbstractFlinkYarnCluster {
public abstract String getWebInterfaceURL();
- public abstract void shutdown();
+ public abstract void shutdown(boolean failApplication);
public abstract boolean hasBeenStopped();
http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index ccde5d8..dd32b0d 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -647,7 +647,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
LOG.info("Shutting down cluster. All tests passed");
// shutdown cluster
- yarnCluster.shutdown();
+ yarnCluster.shutdown(false);
LOG.info("Finished testJavaAPI()");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index 118f4ad..f82f013 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -435,11 +435,11 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);
}
- if( taskManagerMemoryMb > freeClusterMem.containerLimit) {
+ if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
LOG.warn("The requested amount of memory for the TaskManagers ("+taskManagerMemoryMb+"MB) is more than "
+ "the largest possible YARN container: "+freeClusterMem.containerLimit + NOTE_RSC);
}
- if( jobManagerMemoryMb > freeClusterMem.containerLimit) {
+ if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
+ "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 6dd84d6..e408edb 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -297,12 +297,12 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
}
ApplicationReport lastReport = pollingRunner.getLastReport();
if(lastReport == null) {
- LOG.warn("FlinkYarnCluster.hasFailed() has been called on a cluster. that didn't receive a status so far." +
+ LOG.warn("FlinkYarnCluster.hasFailed() has been called on a cluster that didn't receive a status so far." +
"The system might be in an erroneous state");
return false;
} else {
YarnApplicationState appState = lastReport.getYarnApplicationState();
- boolean status= (appState == YarnApplicationState.FAILED ||
+ boolean status = (appState == YarnApplicationState.FAILED ||
appState == YarnApplicationState.KILLED);
if(status) {
LOG.warn("YARN reported application state {}", appState);
@@ -381,12 +381,13 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
// -------------------------- Shutdown handling ------------------------
private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);
- @Override
- public void shutdown() {
- shutdownInternal(true);
- }
- private void shutdownInternal(boolean removeShutdownHook) {
+ /**
+ * Shutdown the YARN cluster.
+ * @param failApplication whether we should fail the YARN application (in case of errors in Flink)
+ */
+ @Override
+ public void shutdown(boolean failApplication) {
if(!isConnected) {
throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
}
@@ -394,16 +395,25 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
if(hasBeenShutDown.getAndSet(true)) {
return;
}
- // the session is being stopped explicitly.
- if(removeShutdownHook) {
+
+ try {
Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
+ } catch (IllegalStateException e) {
+ // we are already in the shutdown hook
}
+
if(actorSystem != null){
LOG.info("Sending shutdown request to the Application Master");
if(applicationClient != ActorRef.noSender()) {
try {
+ FinalApplicationStatus finalStatus;
+ if (failApplication) {
+ finalStatus = FinalApplicationStatus.FAILED;
+ } else {
+ finalStatus = FinalApplicationStatus.SUCCEEDED;
+ }
Future<Object> response = Patterns.ask(applicationClient,
- new Messages.StopYarnSession(FinalApplicationStatus.SUCCEEDED,
+ new Messages.StopYarnSession(finalStatus,
"Flink YARN Client requested shutdown"),
new Timeout(akkaDuration));
@@ -457,7 +467,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
@Override
public void run() {
LOG.info("Shutting down FlinkYarnCluster from the client shutdown hook");
- shutdownInternal(false);
+ shutdown(true);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index d6760ec..ec980d0 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -137,7 +137,7 @@ class ApplicationClient(flinkConfig: Configuration)
case LocalGetYarnClusterStatus =>
sender() ! latestClusterStatus
- // Forward message to Application Master
+ // Forward message to Application Master
case msg: StopAMAfterJob =>
yarnJobManager foreach {
_ forward msg
http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
index 999610f..411808b 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -26,6 +26,7 @@ import akka.actor.ActorRef
import org.apache.flink.api.common.JobID
import org.apache.flink.configuration.ConfigConstants
import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.jobgraph.JobStatus
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus}
import org.apache.flink.runtime.messages.Messages.Acknowledge
@@ -171,8 +172,13 @@ trait ApplicationMasterActor extends ActorLogMessages {
if(jobStatus.status.isTerminalState) {
log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
s"Shutting down YARN session")
- self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED,
- s"The monitored job with ID ${jobStatus.jobID} has finished.")
+ if (jobStatus.status == JobStatus.FINISHED) {
+ self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED,
+ s"The monitored job with ID ${jobStatus.jobID} has finished.")
+ } else {
+ self ! StopYarnSession(FinalApplicationStatus.FAILED,
+ s"The monitored job with ID ${jobStatus.jobID} has failed to complete.")
+ }
} else {
log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
}