You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/05/01 20:45:08 UTC

samza git commit: SAMZA-1250: JobRunner.kill doesn't terminate cleanly with YarnJob.

Repository: samza
Updated Branches:
  refs/heads/master 92ae4c628 -> a1e03af0d


SAMZA-1250: JobRunner.kill doesn't terminate cleanly with YarnJob.

1. The ClientHelper now checks inactive application IDs so it can get status for terminated jobs in addition to running jobs
2. JobRunner.kill() waits for any finish, not just successful finish.
3. A killed job is now considered successful.

Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>

Closes #152 from jmakes/samza-1250


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a1e03af0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a1e03af0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a1e03af0

Branch: refs/heads/master
Commit: a1e03af0da684184fbd95ec4278fc35fe4b4e28b
Parents: 92ae4c6
Author: Jacob Maes <jm...@linkedin.com>
Authored: Mon May 1 13:44:54 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Mon May 1 13:44:54 2017 -0700

----------------------------------------------------------------------
 .../scala/org/apache/samza/job/JobRunner.scala  |  2 +-
 .../apache/samza/job/yarn/ClientHelper.scala    | 38 +++++++++++++-------
 .../org/apache/samza/job/yarn/YarnJob.scala     | 17 +++++----
 3 files changed, 38 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a1e03af0/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index b2f5bd0..f34db99 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -137,7 +137,7 @@ class JobRunner(config: Config) extends Logging {
     info("waiting for job to terminate")
 
     // Wait until the job has terminated, then exit.
-    Option(job.waitForStatus(SuccessfulFinish, 5000)) match {
+    Option(job.waitForFinish(5000)) match {
       case Some(appStatus) => {
         if (SuccessfulFinish.equals(appStatus)) {
           info("job terminated successfully - " + appStatus)

http://git-wip-us.apache.org/repos/asf/samza/blob/a1e03af0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
index dc1ead3..f4fc757 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
@@ -236,22 +236,31 @@ class ClientHelper(conf: Configuration) extends Logging {
     * @return        the active application ids.
     */
   def getActiveApplicationIds(appName: String): List[ApplicationId] = {
-    val getAppsRsp = yarnClient.getApplications
+    val applicationReports = yarnClient.getApplications
 
-    getAppsRsp
+    applicationReports
       .asScala
-        .filter(appRep => ((
-            Running.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)
-            || New.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)
-            )
-          && appName.equals(appRep.getName)))
-        .map(appRep => appRep.getApplicationId)
+        .filter(applicationReport => isActiveApplication(applicationReport)
+          && appName.equals(applicationReport.getName))
+        .map(applicationReport => applicationReport.getApplicationId)
         .toList
   }
 
+  def getPreviousApplicationIds(appName: String): List[ApplicationId] = {
+    val applicationReports = yarnClient.getApplications
+
+    applicationReports
+      .asScala
+      .filter(applicationReport => (!(isActiveApplication(applicationReport))
+        && appName.equals(applicationReport.getName)))
+      .map(applicationReport => applicationReport.getApplicationId)
+      .toList
+  }
+
   def status(appId: ApplicationId): Option[ApplicationStatus] = {
     val statusResponse = yarnClient.getApplicationReport(appId)
-    convertState(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus)
+    info("Got state: %s, final status: %s".format(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus))
+    toAppStatus(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus)
   }
 
   def kill(appId: ApplicationId) {
@@ -271,15 +280,20 @@ class ClientHelper(conf: Configuration) extends Logging {
     status match {
       case Some(status) => getAppsRsp
         .asScala
-        .filter(appRep => status.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get))
+        .filter(appRep => status.equals(toAppStatus(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get))
         .toList
       case None => getAppsRsp.asScala.toList
     }
   }
 
-  private def convertState(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = {
+  private def isActiveApplication(applicationReport: ApplicationReport): Boolean = {
+    (Running.equals(toAppStatus(applicationReport.getYarnApplicationState, applicationReport.getFinalApplicationStatus).get)
+    || New.equals(toAppStatus(applicationReport.getYarnApplicationState, applicationReport.getFinalApplicationStatus).get))
+  }
+
+  private def toAppStatus(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = {
     (state, status) match {
-      case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) => Some(SuccessfulFinish)
+      case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) | (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) => Some(SuccessfulFinish)
       case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) | (YarnApplicationState.FINISHED, _) => Some(UnsuccessfulFinish)
       case (YarnApplicationState.NEW, _) | (YarnApplicationState.SUBMITTED, _) => Some(New)
       case _ => Some(Running)

http://git-wip-us.apache.org/repos/asf/samza/blob/a1e03af0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 030f914..5230b0f 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -140,7 +140,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
         client.status(appId).getOrElse(null)
       case None =>
         logger.info("Unable to report status because no applicationId could be found.")
-        null
+        ApplicationStatus.SuccessfulFinish
     }
   }
 
@@ -171,12 +171,17 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
             logger.info("Fetching status from YARN for application name %s" format applicationName)
             val applicationIds = client.getActiveApplicationIds(applicationName)
 
-            applicationIds.foreach(applicationId =>  {
-              logger.info("Found applicationId %s for applicationName %s" format(applicationId, applicationName))
-            })
+            if (applicationIds.nonEmpty) {
+              // Only return latest one, because there should only be one.
+              logger.info("Matching active ids: " + applicationIds.sorted.reverse.toString())
+              applicationIds.sorted.reverse.headOption
+            } else {
+              // Couldn't find an active applicationID. Use one the latest finished ID.
+              val pastApplicationIds = client.getPreviousApplicationIds(applicationName)
+              // Don't log because there could be many, many previous app IDs for an application.
+              pastApplicationIds.sorted.reverse.headOption  // Get latest
+            }
 
-            // Only return one, because there should only be one.
-            applicationIds.headOption
           case None =>
             None
         }