You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/04/07 21:32:13 UTC
samza git commit: SAMZA-1089: Enable YarnJob and ClientHelper to kill
a job by name rather than YARN ApplicationID
Repository: samza
Updated Branches:
refs/heads/master d6bd2d707 -> fb51bfae2
SAMZA-1089: Enable YarnJob and ClientHelper to kill a job by name rather than YARN ApplicationID
Missed a couple files in the previous commit to enable YarnJob to kill and get status of a Job based on the job name rather than the YARN ApplicationName.
These changes have been manually verified in a Yarn cluster at LI.
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Xinyu Liu <xi...@linkedin.com>
Closes #114 from jmakes/samza-1089-3
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fb51bfae
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fb51bfae
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fb51bfae
Branch: refs/heads/master
Commit: fb51bfae2fd6365554fedcc47db636c339b55714
Parents: d6bd2d7
Author: Jacob Maes <jm...@linkedin.com>
Authored: Fri Apr 7 14:32:02 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Fri Apr 7 14:32:02 2017 -0700
----------------------------------------------------------------------
.../apache/samza/job/yarn/ClientHelper.scala | 35 ++++++++++++++++--
.../org/apache/samza/job/yarn/YarnJob.scala | 38 +++++++++++++++++---
2 files changed, 67 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/fb51bfae/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
index c7b1b6d..dc1ead3 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
@@ -220,6 +220,35 @@ class ClientHelper(conf: Configuration) extends Logging {
appId
}
+ /**
+ * Gets the list of Yarn [[org.apache.hadoop.yarn.api.records.ApplicationId]]
+ * corresponding to the specified appName and are "active".
+ * <p>
+ * In this context, "active" means that the application is starting or running
+ * and is not in any terminated state.
+ * <p>
+ * In Samza, an appName should be unique and there should only be one active
+ * applicationId for a given appName, but this can be violated in unusual cases
+ * like while troubleshooting a new application. So, this method returns as many
+ * active application ids as it finds.
+ *
+ * @param appName the app name as found in the Name column in the Yarn application list.
+ * @return the active application ids.
+ */
+ def getActiveApplicationIds(appName: String): List[ApplicationId] = {
+ val getAppsRsp = yarnClient.getApplications
+
+ getAppsRsp
+ .asScala
+ .filter(appRep => ((
+ Running.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)
+ || New.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)
+ )
+ && appName.equals(appRep.getName)))
+ .map(appRep => appRep.getApplicationId)
+ .toList
+ }
+
def status(appId: ApplicationId): Option[ApplicationStatus] = {
val statusResponse = yarnClient.getApplicationReport(appId)
convertState(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus)
@@ -251,7 +280,7 @@ class ClientHelper(conf: Configuration) extends Logging {
private def convertState(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = {
(state, status) match {
case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) => Some(SuccessfulFinish)
- case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) => Some(UnsuccessfulFinish)
+ case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) | (YarnApplicationState.FINISHED, _) => Some(UnsuccessfulFinish)
case (YarnApplicationState.NEW, _) | (YarnApplicationState.SUBMITTED, _) => Some(New)
case _ => Some(Running)
}
@@ -337,6 +366,8 @@ class ClientHelper(conf: Configuration) extends Logging {
* Cleanup application staging directory.
*/
def cleanupStagingDir(): Unit = {
- YarnJobUtil.cleanupStagingDir(jobContext, FileSystem.get(conf))
+ if (jobContext != null) {
+ YarnJobUtil.cleanupStagingDir(jobContext, FileSystem.get(conf))
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fb51bfae/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 46dc4d1..030f914 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -134,16 +134,22 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
}
def getStatus: ApplicationStatus = {
- appId match {
- case Some(appId) => client.status(appId).getOrElse(null)
- case None => null
+ getAppId match {
+ case Some(appId) =>
+ logger.info("Getting status for applicationId %s" format appId)
+ client.status(appId).getOrElse(null)
+ case None =>
+ logger.info("Unable to report status because no applicationId could be found.")
+ null
}
}
def kill: YarnJob = {
- appId match {
+ // getAppId only returns one appID. Run multiple times to kill dupes (erroneous case)
+ getAppId match {
case Some(appId) =>
try {
+ logger.info("Killing applicationId {}", appId)
client.kill(appId)
} finally {
client.cleanupStagingDir
@@ -152,4 +158,28 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
}
this
}
+
+ private def getAppId: Option[ApplicationId] = {
+ appId match {
+ case Some(applicationId) =>
+ appId
+ case None =>
+ // Get by name
+ config.getName match {
+ case Some(jobName) =>
+ val applicationName = "%s_%s" format(jobName, config.getJobId.getOrElse(1))
+ logger.info("Fetching status from YARN for application name %s" format applicationName)
+ val applicationIds = client.getActiveApplicationIds(applicationName)
+
+ applicationIds.foreach(applicationId => {
+ logger.info("Found applicationId %s for applicationName %s" format(applicationId, applicationName))
+ })
+
+ // Only return one, because there should only be one.
+ applicationIds.headOption
+ case None =>
+ None
+ }
+ }
+ }
}