You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/07/04 00:18:00 UTC

git commit: [SPARK-1516]Throw exception in yarn client instead of run system.exit

Repository: spark
Updated Branches:
  refs/heads/branch-0.9 b3f4245ff -> 0d3d5ce2b


[SPARK-1516]Throw exception in yarn client instead of run system.exit

    [SPARK-1516]Throw exception in yarn client instead of run system.exit directly.

    All the changes is in  the package of "org.apache.spark.deploy.yarn":
        1) Throw IllegalArgumentException in ClinetArguments  instead of exit directly.
        2) In Client's main method, if exception is caught, it will exit with code 1, otherwise exit with code 0.
        3) In YarnClientSchedulerBackend's start method, if IllegalArgumentException is caught, it will exit with code 1, otherwise throw that exception.
        4) Fix some message typo in the Client.scala
    After the fix, if user integrate the spark yarn client into their applications,
    when the argument is wrong or the running is finished, the application won't be terminated.

+CC dbtsai mengxr

Author: John Zhao <co...@gmail.com>

Closes #1099 from codeboyyong/branch-0.9 and squashes the following commits:

00144b5 [John Zhao] use e.printStackTrace() to replace "Console.err.println(e.getMessage)" so that client console can  get more useful information when somthing is wrong.
addcecb [John Zhao]   [SPARK-1516]Throw exception in yarn client instead of run system.exit directly.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d3d5ce2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d3d5ce2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d3d5ce2

Branch: refs/heads/branch-0.9
Commit: 0d3d5ce2b26b7cbaa7312cfdd53f6ab6603ae7f6
Parents: b3f4245
Author: John Zhao <co...@gmail.com>
Authored: Thu Jul 3 15:17:51 2014 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Thu Jul 3 15:17:51 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 33 ++++++++++++------
 .../spark/deploy/yarn/ClientArguments.scala     | 16 ++++-----
 .../cluster/YarnClientSchedulerBackend.scala    | 15 ++++++---
 .../org/apache/spark/deploy/yarn/Client.scala   | 35 +++++++++++++-------
 4 files changed, 63 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0d3d5ce2/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 9e5e2d5..9998210 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -93,7 +93,6 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
   def run() {
     val appId = runApp()
     monitorApplication(appId)
-    System.exit(0)
   }
 
   def validateArgs() = {
@@ -109,7 +108,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
     ).foreach { case(cond, errStr) =>
       if (cond) {
         logError(errStr)
-        args.printUsageAndExit(1)
+        throw new IllegalArgumentException(args.getUsageMessage())
       }
     }
   }
@@ -135,17 +134,19 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
 
   def verifyClusterResources(app: GetNewApplicationResponse) = {
     val maxMem = app.getMaximumResourceCapability().getMemory()
-    logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
+    logInfo("Max mem capability of a single resource in this cluster " + maxMem)
 
     // If we have requested more then the clusters max for a single resource then exit.
     if (args.workerMemory > maxMem) {
-      logError("the worker size is to large to run on this cluster " + args.workerMemory)
-      System.exit(1)
+      val errorMessage = s"the worker size is too large to run on this cluster ${args.workerMemory}"
+      logError(errorMessage)
+      throw new IllegalArgumentException(errorMessage)
     }
     val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
     if (amMem > maxMem) {
-      logError("AM size is to large to run on this cluster " + amMem)
-      System.exit(1)
+      val errorMessage = s"AM size is too large to run on this cluster $amMem"
+      logError(errorMessage)
+      throw new IllegalArgumentException(errorMessage)
     }
 
     // We could add checks to make sure the entire cluster has enough resources but that involves
@@ -229,8 +230,9 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
     val delegTokenRenewer = Master.getMasterPrincipal(conf)
     if (UserGroupInformation.isSecurityEnabled()) {
       if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
-        logError("Can't get Master Kerberos principal for use as renewer")
-        System.exit(1)
+        val errorMessage = "Can't get Master Kerberos principal for use as renewer"
+        logError(errorMessage)
+        throw new IllegalArgumentException(errorMessage)
       }
     }
     val dst = new Path(fs.getHomeDirectory(), appStagingDir)
@@ -475,9 +477,18 @@ object Client {
     System.setProperty("SPARK_YARN_MODE", "true")
 
     val sparkConf = new SparkConf
-    val args = new ClientArguments(argStrings, sparkConf)
 
-    new Client(args, sparkConf).run
+    try {
+      val args = new ClientArguments(argStrings, sparkConf)
+      new Client(args, sparkConf).run()
+    } catch {
+      case e: Exception => {
+        e.printStackTrace()
+        System.exit(1)
+      }
+    }
+
+    System.exit(0)
   }
 
   // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps

http://git-wip-us.apache.org/repos/asf/spark/blob/0d3d5ce2/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 1419f21..6bbe87d 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -109,11 +109,11 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
 
         case Nil =>
           if (userJar == null || userClass == null) {
-            printUsageAndExit(1)
+            throw new IllegalArgumentException(getUsageMessage())
           }
 
         case _ =>
-          printUsageAndExit(1, args)
+          throw new IllegalArgumentException(getUsageMessage(args))
       }
     }
 
@@ -122,11 +122,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
   }
 
 
-  def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
-    if (unknownParam != null) {
-      System.err.println("Unknown/unsupported param " + unknownParam)
-    }
-    System.err.println(
+  def getUsageMessage(unknownParam: Any = null): String = {
+    val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam \n" else ""
+
+    message +
       "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
       "Options:\n" +
       "  --jar JAR_PATH             Path to your application's JAR file (required)\n" +
@@ -143,8 +142,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
       "  --addJars jars             Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
       "  --files files              Comma separated list of files to be distributed with the job.\n" +
       "  --archives archives        Comma separated list of archives to be distributed with the job."
-      )
-    System.exit(exitCode)
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d3d5ce2/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 22e55e0..ac786af 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -73,10 +73,17 @@ private[spark] class YarnClientSchedulerBackend(
     .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) }
       
     logDebug("ClientArguments called with: " + argsArrayBuf)
-    val args = new ClientArguments(argsArrayBuf.toArray, conf)
-    client = new Client(args, conf)
-    appId = client.runApp()
-    waitForApp()
+    try {
+      val args = new ClientArguments(argsArrayBuf.toArray, conf)
+      client = new Client(args, conf)
+      appId = client.runApp()
+      waitForApp()
+    } catch {
+      case e: IllegalArgumentException => {
+        e.printStackTrace()
+        System.exit(1)
+      }
+    }
   }
 
   def waitForApp() {

http://git-wip-us.apache.org/repos/asf/spark/blob/0d3d5ce2/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 6ff8c6c..1bfcaf6 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -113,7 +113,6 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
   def run() {
     val appId = runApp()
     monitorApplication(appId)
-    System.exit(0)
   }
 
   // TODO(harvey): This could just go in ClientArguments.
@@ -130,7 +129,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
     ).foreach { case(cond, errStr) =>
       if (cond) {
         logError(errStr)
-        args.printUsageAndExit(1)
+        throw new IllegalArgumentException(args.getUsageMessage())
       }
     }
   }
@@ -160,15 +159,18 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
 
     // If we have requested more then the clusters max for a single resource then exit.
     if (args.workerMemory > maxMem) {
-      logError("Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster.".
-        format(args.workerMemory, maxMem))
-      System.exit(1)
+      val errorMessage =
+        "Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster."
+        .format(args.workerMemory, maxMem)
+      logError(errorMessage)
+      throw new IllegalArgumentException(errorMessage)
     }
     val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
     if (amMem > maxMem) {
-      logError("Required AM memory (%d) is above the max threshold (%d) of this cluster".
-        format(args.amMemory, maxMem))
-      System.exit(1)
+      val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster"
+        .format(args.amMemory, maxMem)
+      logError(errorMessage)
+      throw new IllegalArgumentException(errorMessage)
     }
 
     // We could add checks to make sure the entire cluster has enough resources but that involves
@@ -244,8 +246,9 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
     val delegTokenRenewer = Master.getMasterPrincipal(conf)
     if (UserGroupInformation.isSecurityEnabled()) {
       if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
-        logError("Can't get Master Kerberos principal for use as renewer")
-        System.exit(1)
+        val errorMessage = "Can't get Master Kerberos principal for use as renewer"
+        logError(errorMessage)
+        throw new IllegalArgumentException(errorMessage)
       }
     }
     val dst = new Path(fs.getHomeDirectory(), appStagingDir)
@@ -489,9 +492,17 @@ object Client {
     // see Client#setupLaunchEnv().
     System.setProperty("SPARK_YARN_MODE", "true")
     val sparkConf = new SparkConf()
-    val args = new ClientArguments(argStrings, sparkConf)
+    try {
+      val args = new ClientArguments(argStrings, sparkConf)
+      new Client(args, sparkConf).run()
+    } catch {
+      case e: Exception => {
+        e.printStackTrace()
+        System.exit(1)
+      }
+    }
 
-    new Client(args, sparkConf).run()
+    System.exit(0)
   }
 
   // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps