You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/06/17 00:58:58 UTC

[6/8] flink git commit: [FLINK-2226][YARN] fail application on failed single-job cluster job

[FLINK-2226][YARN] fail application on failed single-job cluster job

Failing jobs executed in the YARN cluster mode leave the application
container in the "SUCCEEDED" final state. While for long-running Flink
YARN clusters where multiple jobs are run, this is fine, for single jobs
it is appropriate to mark the application as failed.

This closes #838.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44b969e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44b969e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44b969e0

Branch: refs/heads/release-0.9
Commit: 44b969e0a58347f0262bfc5bc9626362078b373b
Parents: 57810b5
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Jun 15 16:37:35 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Jun 17 00:51:16 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  8 ++---
 .../flink/client/FlinkYarnSessionCli.java       |  6 ++--
 .../org/apache/flink/client/program/Client.java |  1 +
 .../flink/runtime/jobgraph/JobStatus.java       |  2 +-
 .../runtime/yarn/AbstractFlinkYarnCluster.java  |  2 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  2 +-
 .../org/apache/flink/yarn/FlinkYarnClient.java  |  4 +--
 .../org/apache/flink/yarn/FlinkYarnCluster.java | 32 +++++++++++++-------
 .../apache/flink/yarn/ApplicationClient.scala   |  2 +-
 .../flink/yarn/ApplicationMasterActor.scala     | 10 ++++--
 10 files changed, 43 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 3e61a3b..e1bacde 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -265,6 +265,7 @@ public class CliFrontend {
 			return handleError(t);
 		}
 
+		int exitCode = 1;
 		try {
 			int userParallelism = options.getParallelism();
 			LOG.debug("User parallelism is set to {}", userParallelism);
@@ -276,15 +277,14 @@ public class CliFrontend {
 						"To use another parallelism, set it at the ./bin/flink client.");
 				userParallelism = client.getMaxSlots();
 			}
-			int exitCode = 0;
 
 			// check if detached per job yarn cluster is used to start flink
 			if(yarnCluster != null && yarnCluster.isDetached()) {
 				logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
 						"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
-						"yarn application -kill "+yarnCluster.getApplicationId()+"\n" +
+						"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
 						"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
-				executeProgram(program, client, userParallelism, false);
+				exitCode = executeProgram(program, client, userParallelism, false);
 			} else {
 				// regular (blocking) execution.
 				exitCode = executeProgram(program, client, userParallelism, true);
@@ -314,7 +314,7 @@ public class CliFrontend {
 		finally {
 			if (yarnCluster != null && !yarnCluster.isDetached()) {
 				logAndSysout("Shutting down YARN cluster");
-				yarnCluster.shutdown();
+				yarnCluster.shutdown(exitCode != 0);
 			}
 			if (program != null) {
 				program.deleteExtractedLibraries();

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index 0fa7173..c11edc7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -302,7 +302,7 @@ public class FlinkYarnSessionCli {
 
 				if (yarnCluster.hasFailed()) {
 					System.err.println("The YARN cluster has failed");
-					yarnCluster.shutdown();
+					yarnCluster.shutdown(true);
 				}
 
 				// wait until CLIENT_POLLING_INTERVALL is over or the user entered something.
@@ -439,7 +439,7 @@ public class FlinkYarnSessionCli {
 
 				if (!yarnCluster.hasBeenStopped()) {
 					LOG.info("Command Line Interface requested session shutdown");
-					yarnCluster.shutdown();
+					yarnCluster.shutdown(false);
 				}
 
 				try {
@@ -458,7 +458,7 @@ public class FlinkYarnSessionCli {
 	public void stop() {
 		if (yarnCluster != null) {
 			LOG.info("Command line interface is shutting down the yarnCluster");
-			yarnCluster.shutdown();
+			yarnCluster.shutdown(false);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index e219a38..c544e8d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -318,6 +318,7 @@ public class Client {
 				ContextEnvironment.enableLocalExecution(true);
 			}
 
+			// Job id has been set in the Client passed to the ContextEnvironment
 			return new JobSubmissionResult(lastJobId);
 		}
 		else {

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 667a68e..eb7d017 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -51,7 +51,7 @@ public enum JobStatus {
 	
 	private final boolean terminalState;
 	
-	private JobStatus(boolean terminalState) {
+	JobStatus(boolean terminalState) {
 		this.terminalState = terminalState;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
index 398709e..c2e897f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
@@ -30,7 +30,7 @@ public abstract class AbstractFlinkYarnCluster {
 
 	public abstract String getWebInterfaceURL();
 
-	public abstract void shutdown();
+	public abstract void shutdown(boolean failApplication);
 
 	public abstract boolean hasBeenStopped();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index ccde5d8..dd32b0d 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -647,7 +647,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 
 		LOG.info("Shutting down cluster. All tests passed");
 		// shutdown cluster
-		yarnCluster.shutdown();
+		yarnCluster.shutdown(false);
 		LOG.info("Finished testJavaAPI()");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index 118f4ad..f82f013 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -435,11 +435,11 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 					+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);
 
 		}
-		if( taskManagerMemoryMb > freeClusterMem.containerLimit) {
+		if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
 			LOG.warn("The requested amount of memory for the TaskManagers ("+taskManagerMemoryMb+"MB) is more than "
 					+ "the largest possible YARN container: "+freeClusterMem.containerLimit + NOTE_RSC);
 		}
-		if( jobManagerMemoryMb > freeClusterMem.containerLimit) {
+		if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
 			LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
 					+ "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 6dd84d6..e408edb 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -297,12 +297,12 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 		}
 		ApplicationReport lastReport = pollingRunner.getLastReport();
 		if(lastReport == null) {
-			LOG.warn("FlinkYarnCluster.hasFailed() has been called on a cluster. that didn't receive a status so far." +
+			LOG.warn("FlinkYarnCluster.hasFailed() has been called on a cluster that didn't receive a status so far." +
 					"The system might be in an erroneous state");
 			return false;
 		} else {
 			YarnApplicationState appState = lastReport.getYarnApplicationState();
-			boolean status= (appState == YarnApplicationState.FAILED ||
+			boolean status = (appState == YarnApplicationState.FAILED ||
 					appState == YarnApplicationState.KILLED);
 			if(status) {
 				LOG.warn("YARN reported application state {}", appState);
@@ -381,12 +381,13 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 	// -------------------------- Shutdown handling ------------------------
 
 	private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);
-	@Override
-	public void shutdown() {
-		shutdownInternal(true);
-	}
 
-	private void shutdownInternal(boolean removeShutdownHook) {
+	/**
+	 * Shutdown the YARN cluster.
+	 * @param failApplication whether we should fail the YARN application (in case of errors in Flink)
+	 */
+	@Override
+	public void shutdown(boolean failApplication) {
 		if(!isConnected) {
 			throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
 		}
@@ -394,16 +395,25 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 		if(hasBeenShutDown.getAndSet(true)) {
 			return;
 		}
-		// the session is being stopped explicitly.
-		if(removeShutdownHook) {
+
+		try {
 			Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
+		} catch (IllegalStateException e) {
+			// we are already in the shutdown hook
 		}
+
 		if(actorSystem != null){
 			LOG.info("Sending shutdown request to the Application Master");
 			if(applicationClient != ActorRef.noSender()) {
 				try {
+					FinalApplicationStatus finalStatus;
+					if (failApplication) {
+						finalStatus = FinalApplicationStatus.FAILED;
+					} else {
+						finalStatus = FinalApplicationStatus.SUCCEEDED;
+					}
 					Future<Object> response = Patterns.ask(applicationClient,
-							new Messages.StopYarnSession(FinalApplicationStatus.SUCCEEDED,
+							new Messages.StopYarnSession(finalStatus,
 									"Flink YARN Client requested shutdown"),
 							new Timeout(akkaDuration));
 
@@ -457,7 +467,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 		@Override
 		public void run() {
 			LOG.info("Shutting down FlinkYarnCluster from the client shutdown hook");
-			shutdownInternal(false);
+			shutdown(true);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index d6760ec..ec980d0 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -137,7 +137,7 @@ class ApplicationClient(flinkConfig: Configuration)
     case LocalGetYarnClusterStatus =>
       sender() ! latestClusterStatus
 
-      // Forward message to Application Master
+    // Forward message to Application Master
     case msg: StopAMAfterJob =>
       yarnJobManager foreach {
         _ forward msg

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
index 999610f..411808b 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -26,6 +26,7 @@ import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.ConfigConstants
 import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus}
 import org.apache.flink.runtime.messages.Messages.Acknowledge
@@ -171,8 +172,13 @@ trait ApplicationMasterActor extends ActorLogMessages {
           if(jobStatus.status.isTerminalState) {
             log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
               s"Shutting down YARN session")
-            self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED,
-              s"The monitored job with ID ${jobStatus.jobID} has finished.")
+            if (jobStatus.status == JobStatus.FINISHED) {
+              self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED,
+                s"The monitored job with ID ${jobStatus.jobID} has finished.")
+            } else {
+              self ! StopYarnSession(FinalApplicationStatus.FAILED,
+                s"The monitored job with ID ${jobStatus.jobID} has failed to complete.")
+            }
           } else {
             log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
           }