You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2015/02/11 02:19:12 UTC

spark git commit: [SPARK-5493] [core] Add option to impersonate user.

Repository: spark
Updated Branches:
  refs/heads/master e28b6bdbb -> ed167e70c


[SPARK-5493] [core] Add option to impersonate user.

Hadoop has a feature that allows users to impersonate other users
when submitting applications or talking to HDFS, for example. These
impersonated users are referred generally as "proxy users".

Services such as Oozie or Hive use this feature to run applications
as the requesting user.

This change makes SparkSubmit accept a new command line option to
run the application as a proxy user. It also fixes the plumbing
of the user name through the UI (and a couple of other places) to
refer to the correct user running the application, which can be
different than `sys.props("user.name")` even without proxies (e.g.
when using kerberos).

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #4405 from vanzin/SPARK-5493 and squashes the following commits:

df82427 [Marcelo Vanzin] Clarify the reason for the special exception handling.
05bfc08 [Marcelo Vanzin] Remove unneeded annotation.
4840de9 [Marcelo Vanzin] Review feedback.
8af06ff [Marcelo Vanzin] Fix usage string.
2e4fa8f [Marcelo Vanzin] Merge branch 'master' into SPARK-5493
b6c947d [Marcelo Vanzin] Merge branch 'master' into SPARK-5493
0540d38 [Marcelo Vanzin] [SPARK-5493] [core] Add option to impersonate user.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed167e70
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed167e70
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed167e70

Branch: refs/heads/master
Commit: ed167e70c6d355f39b366ea0d3b92dd26d826a0b
Parents: e28b6bd
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Feb 10 17:19:10 2015 -0800
Committer: Patrick Wendell <pa...@databricks.com>
Committed: Tue Feb 10 17:19:10 2015 -0800

----------------------------------------------------------------------
 bin/utils.sh                                    |  3 +-
 bin/windows-utils.cmd                           |  1 +
 .../org/apache/spark/SecurityManager.scala      |  3 +-
 .../scala/org/apache/spark/SparkContext.scala   | 16 ++----
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 19 +++----
 .../org/apache/spark/deploy/SparkSubmit.scala   | 56 ++++++++++++++++----
 .../spark/deploy/SparkSubmitArguments.scala     |  7 +++
 .../scala/org/apache/spark/util/Utils.scala     | 11 ++++
 8 files changed, 82 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ed167e70/bin/utils.sh
----------------------------------------------------------------------
diff --git a/bin/utils.sh b/bin/utils.sh
index 2241200..748dbe3 100755
--- a/bin/utils.sh
+++ b/bin/utils.sh
@@ -35,7 +35,8 @@ function gatherSparkSubmitOpts() {
       --master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \
       --conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \
       --driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \
-      --total-executor-cores | --executor-cores | --queue | --num-executors | --archives)
+      --total-executor-cores | --executor-cores | --queue | --num-executors | --archives | \
+      --proxy-user)
         if [[ $# -lt 2 ]]; then
           "$SUBMIT_USAGE_FUNCTION"
           exit 1;

http://git-wip-us.apache.org/repos/asf/spark/blob/ed167e70/bin/windows-utils.cmd
----------------------------------------------------------------------
diff --git a/bin/windows-utils.cmd b/bin/windows-utils.cmd
index 567b873..0cf9e87 100644
--- a/bin/windows-utils.cmd
+++ b/bin/windows-utils.cmd
@@ -33,6 +33,7 @@ SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--
 SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>"
 SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>"
 SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\>"
+SET opts="%opts:~1,-1% \<--proxy-user\>"
 
 echo %1 | findstr %opts% >nul
 if %ERRORLEVEL% equ 0 (

http://git-wip-us.apache.org/repos/asf/spark/blob/ed167e70/core/src/main/scala/org/apache/spark/SecurityManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 88d35a4..3653f72 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.Text
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.network.sasl.SecretKeyHolder
+import org.apache.spark.util.Utils
 
 /**
  * Spark class responsible for security.
@@ -203,7 +204,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
 
   // always add the current user and SPARK_USER to the viewAcls
   private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
-    Option(System.getenv("SPARK_USER")).getOrElse("")).filter(!_.isEmpty)
+    Utils.getCurrentUserName())
 
   setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
   setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))

http://git-wip-us.apache.org/repos/asf/spark/blob/ed167e70/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 04ca5d1..53fce6b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -191,7 +191,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
 
   // log out Spark Version in Spark driver log
   logInfo(s"Running Spark version $SPARK_VERSION")
-  
+
   private[spark] val conf = config.clone()
   conf.validateSettings()
 
@@ -335,11 +335,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   executorEnvs ++= conf.getExecutorEnv
 
   // Set SPARK_USER for user who is running SparkContext.
-  val sparkUser = Option {
-    Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name"))
-  }.getOrElse {
-    SparkContext.SPARK_UNKNOWN_USER
-  }
+  val sparkUser = Utils.getCurrentUserName()
   executorEnvs("SPARK_USER") = sparkUser
 
   // Create and start the scheduler
@@ -826,7 +822,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       vClass: Class[V],
       conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
     assertNotStopped()
-    // The call to new NewHadoopJob automatically adds security credentials to conf, 
+    // The call to new NewHadoopJob automatically adds security credentials to conf,
     // so we don't need to explicitly add them ourselves
     val job = new NewHadoopJob(conf)
     NewFileInputFormat.addInputPath(job, new Path(path))
@@ -1626,8 +1622,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   @deprecated("use defaultMinPartitions", "1.0.0")
   def defaultMinSplits: Int = math.min(defaultParallelism, 2)
 
-  /** 
-   * Default min number of partitions for Hadoop RDDs when not given by user 
+  /**
+   * Default min number of partitions for Hadoop RDDs when not given by user
    * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
    * The reasons for this are discussed in https://github.com/mesos/spark/pull/718
    */
@@ -1844,8 +1840,6 @@ object SparkContext extends Logging {
 
   private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
 
-  private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
-
   private[spark] val DRIVER_IDENTIFIER = "<driver>"
 
   // The following deprecated objects have already been copied to `object AccumulatorParam` to

http://git-wip-us.apache.org/repos/asf/spark/blob/ed167e70/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 03238e9..e0a32fb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -52,18 +52,13 @@ class SparkHadoopUtil extends Logging {
    * do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
    */
   def runAsSparkUser(func: () => Unit) {
-    val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
-    if (user != SparkContext.SPARK_UNKNOWN_USER) {
-      logDebug("running as user: " + user)
-      val ugi = UserGroupInformation.createRemoteUser(user)
-      transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
-      ugi.doAs(new PrivilegedExceptionAction[Unit] {
-        def run: Unit = func()
-      })
-    } else {
-      logDebug("running as SPARK_UNKNOWN_USER")
-      func()
-    }
+    val user = Utils.getCurrentUserName()
+    logDebug("running as user: " + user)
+    val ugi = UserGroupInformation.createRemoteUser(user)
+    transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
+    ugi.doAs(new PrivilegedExceptionAction[Unit] {
+      def run: Unit = func()
+    })
   }
 
   def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {

http://git-wip-us.apache.org/repos/asf/spark/blob/ed167e70/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c4bc505..80cc058 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -18,12 +18,14 @@
 package org.apache.spark.deploy
 
 import java.io.{File, PrintStream}
-import java.lang.reflect.{InvocationTargetException, Modifier}
+import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
 import java.net.URL
+import java.security.PrivilegedExceptionAction
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
 
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.ivy.Ivy
 import org.apache.ivy.core.LogOptions
 import org.apache.ivy.core.module.descriptor._
@@ -79,7 +81,7 @@ object SparkSubmit {
   private val CLASS_NOT_FOUND_EXIT_STATUS = 101
 
   // Exposed for testing
-  private[spark] var exitFn: () => Unit = () => System.exit(-1)
+  private[spark] var exitFn: () => Unit = () => System.exit(1)
   private[spark] var printStream: PrintStream = System.err
   private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)
   private[spark] def printErrorAndExit(str: String) = {
@@ -126,6 +128,34 @@ object SparkSubmit {
    */
   private[spark] def submit(args: SparkSubmitArguments): Unit = {
     val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
+
+    def doRunMain(): Unit = {
+      if (args.proxyUser != null) {
+        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
+          UserGroupInformation.getCurrentUser())
+        try {
+          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
+            override def run(): Unit = {
+              runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
+            }
+          })
+        } catch {
+          case e: Exception =>
+            // Hadoop's AuthorizationException suppresses the exception's stack trace, which
+            // makes the message printed to the output by the JVM not very helpful. Instead,
+            // detect exceptions with empty stack traces here, and treat them differently.
+            if (e.getStackTrace().length == 0) {
+              printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
+              exitFn()
+            } else {
+              throw e
+            }
+        }
+      } else {
+        runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
+      }
+    }
+
      // In standalone cluster mode, there are two submission gateways:
      //   (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
      //   (2) The new REST-based gateway introduced in Spark 1.3
@@ -134,7 +164,7 @@ object SparkSubmit {
     if (args.isStandaloneCluster && args.useRest) {
       try {
         printStream.println("Running Spark using the REST application submission protocol.")
-        runMain(childArgs, childClasspath, sysProps, childMainClass)
+        doRunMain()
       } catch {
         // Fail over to use the legacy submission gateway
         case e: SubmitRestConnectionException =>
@@ -145,7 +175,7 @@ object SparkSubmit {
       }
     // In all other modes, just run the main class as prepared
     } else {
-      runMain(childArgs, childClasspath, sysProps, childMainClass)
+      doRunMain()
     }
   }
 
@@ -457,7 +487,7 @@ object SparkSubmit {
       childClasspath: Seq[String],
       sysProps: Map[String, String],
       childMainClass: String,
-      verbose: Boolean = false) {
+      verbose: Boolean): Unit = {
     if (verbose) {
       printStream.println(s"Main class:\n$childMainClass")
       printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
@@ -507,13 +537,21 @@ object SparkSubmit {
     if (!Modifier.isStatic(mainMethod.getModifiers)) {
       throw new IllegalStateException("The main method in the given main class must be static")
     }
+
+    def findCause(t: Throwable): Throwable = t match {
+      case e: UndeclaredThrowableException =>
+        if (e.getCause() != null) findCause(e.getCause()) else e
+      case e: InvocationTargetException =>
+        if (e.getCause() != null) findCause(e.getCause()) else e
+      case e: Throwable =>
+        e
+    }
+
     try {
       mainMethod.invoke(null, childArgs.toArray)
     } catch {
-      case e: InvocationTargetException => e.getCause match {
-        case cause: Throwable => throw cause
-        case null => throw e
-      }
+      case t: Throwable =>
+        throw findCause(t)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ed167e70/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index bd0ae26..fa38070 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -57,6 +57,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
   var pyFiles: String = null
   var action: SparkSubmitAction = null
   val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
+  var proxyUser: String = null
 
   // Standalone cluster mode only
   var supervise: Boolean = false
@@ -405,6 +406,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
         }
         parse(tail)
 
+      case ("--proxy-user") :: value :: tail =>
+        proxyUser = value
+        parse(tail)
+
       case ("--help" | "-h") :: tail =>
         printUsageAndExit(0)
 
@@ -476,6 +481,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
         |
         |  --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).
         |
+        |  --proxy-user NAME           User to impersonate when submitting the application.
+        |
         |  --help, -h                  Show this help message and exit
         |  --verbose, -v               Print additional debug output
         |

http://git-wip-us.apache.org/repos/asf/spark/blob/ed167e70/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 61d287c..6af8dd5 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -38,6 +38,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.commons.lang3.SystemUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.log4j.PropertyConfigurator
 import org.eclipse.jetty.util.MultiException
 import org.json4s._
@@ -1986,6 +1987,16 @@ private[spark] object Utils extends Logging {
         throw new SparkException("Invalid master URL: " + sparkUrl, e)
     }
   }
+
+  /**
+   * Returns the current user name. This is the currently logged in user, unless that's been
+   * overridden by the `SPARK_USER` environment variable.
+   */
+  def getCurrentUserName(): String = {
+    Option(System.getenv("SPARK_USER"))
+      .getOrElse(UserGroupInformation.getCurrentUser().getUserName())
+  }
+
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org