You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2014/03/20 22:51:18 UTC
git commit: SPARK-1032. If Yarn app fails before registering,
app master stays aroun...
Repository: spark
Updated Branches:
refs/heads/branch-0.9 748f002b3 -> c6630d363
SPARK-1032. If Yarn app fails before registering, app master stays aroun...
...d long after
This reopens https://github.com/apache/incubator-spark/pull/648 against the new repo.
Author: Sandy Ryza <sa...@cloudera.com>
Closes #28 from sryza/sandy-spark-1032 and squashes the following commits:
5953f50 [Sandy Ryza] SPARK-1032. If Yarn app fails before registering, app master stays around long after
Conflicts:
yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6630d36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6630d36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6630d36
Branch: refs/heads/branch-0.9
Commit: c6630d363a73184c8dcca9f2c0c6fc3f5c8e47bf
Parents: 748f002
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Fri Feb 28 09:40:47 2014 -0600
Committer: Thomas Graves <tg...@apache.org>
Committed: Thu Mar 20 16:50:44 2014 -0500
----------------------------------------------------------------------
.../spark/deploy/yarn/ApplicationMaster.scala | 34 +++++++++++++-------
.../spark/deploy/yarn/ApplicationMaster.scala | 22 +++++++++----
2 files changed, 38 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c6630d36/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 7aa1894..e045b9f 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -66,6 +66,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3))
+ private var registered = false
+
private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)
@@ -114,7 +116,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
waitForSparkContextInitialized()
// Do this after spark master is up and SparkContext is created so that we can register UI Url
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+ synchronized {
+ if (!isFinished) {
+ registerApplicationMaster()
+ registered = true
+ }
+ }
// Allocate all containers
allocateWorkers()
@@ -212,7 +219,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
var count = 0
val waitTime = 10000L
val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
- while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
+ while (ApplicationMaster.sparkContextRef.get() == null && count < numTries
+ && !isFinished) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
ApplicationMaster.sparkContextRef.wait(waitTime)
@@ -345,17 +353,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
return
}
isFinished = true
+
+ logInfo("finishApplicationMaster with " + status)
+ if (registered) {
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(appAttemptId)
+ finishReq.setFinishApplicationStatus(status)
+ finishReq.setDiagnostics(diagnostics)
+ // Set tracking url to empty since we don't have a history server.
+ finishReq.setTrackingUrl("")
+ resourceManager.finishApplicationMaster(finishReq)
+ }
}
-
- logInfo("finishApplicationMaster with " + status)
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(appAttemptId)
- finishReq.setFinishApplicationStatus(status)
- finishReq.setDiagnostics(diagnostics)
- // Set tracking url to empty since we don't have a history server.
- finishReq.setTrackingUrl("")
- resourceManager.finishApplicationMaster(finishReq)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/c6630d36/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 45a7f38..b312a42 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -68,6 +68,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3))
+ private var registered = false
+
private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)
@@ -103,7 +105,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
waitForSparkContextInitialized()
// Do this after Spark master is up and SparkContext is created so that we can register UI Url.
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+ synchronized {
+ if (!isFinished) {
+ registerApplicationMaster()
+ registered = true
+ }
+ }
// Allocate all containers
allocateWorkers()
@@ -184,7 +191,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
var numTries = 0
val waitTime = 10000L
val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
- while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) {
+ while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries
+ && !isFinished) {
logInfo("Waiting for Spark context initialization ... " + numTries)
numTries = numTries + 1
ApplicationMaster.sparkContextRef.wait(waitTime)
@@ -317,11 +325,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
return
}
isFinished = true
- }
- logInfo("finishApplicationMaster with " + status)
- // Set tracking URL to empty since we don't have a history server.
- amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
+ logInfo("finishApplicationMaster with " + status)
+ if (registered) {
+ // Set tracking URL to empty since we don't have a history server.
+ amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
+ }
+ }
}
/**