You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/11/09 22:48:06 UTC

[3/4] git commit: Don't call the doAs if user is unknown or the same user that is already running

Don't call the doAs if user is unknown or the same user that is already running


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/13a19505
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/13a19505
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/13a19505

Branch: refs/heads/master
Commit: 13a19505e4d77c93a279d0273826974b4f4cb0a4
Parents: f95cb04
Author: tgravescs <tg...@yahoo.com>
Authored: Fri Nov 8 12:04:09 2013 -0600
Committer: tgravescs <tg...@yahoo.com>
Committed: Fri Nov 8 12:04:09 2013 -0600

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 21 +++++++++++++++-----
 .../org/apache/spark/executor/Executor.scala    |  5 +----
 2 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/13a19505/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index c29a301..fc1537f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.UserGroupInformation
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkContext, SparkException}
 
 /**
  * Contains util methods to interact with Hadoop from Spark.
@@ -34,10 +34,21 @@ class SparkHadoopUtil {
   UserGroupInformation.setConfiguration(conf)
 
   def runAsUser(user: String)(func: () => Unit) {
-    val ugi = UserGroupInformation.createRemoteUser(user)
-    ugi.doAs(new PrivilegedExceptionAction[Unit] {
-      def run: Unit = func()
-    })
+    // if we are already running as the user intended there is no reason to do the doAs. It 
+    // will actually break secure HDFS access as it doesn't fill in the credentials. Also if
+    // the user is UNKNOWN then we shouldn't be creating a remote unknown user 
+    // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only 
+    // in SparkContext.
+    val currentUser = Option(System.getProperty("user.name")).
+      getOrElse(SparkContext.SPARK_UNKNOWN_USER)
+    if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) {
+      val ugi = UserGroupInformation.createRemoteUser(user)
+      ugi.doAs(new PrivilegedExceptionAction[Unit] {
+        def run: Unit = func()
+      })
+    } else {
+      func()
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/13a19505/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0a4f10c..5c9bb9d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -179,10 +179,7 @@ private[spark] class Executor(
       }
     }
 
-    // the runAsUser breaks secure HDFS access. It needs to add the credentials
-    // for the user if running as a user. Comment out for now. 
-    //override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
-    override def run(): Unit = {
+    override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
       val startTime = System.currentTimeMillis()
       SparkEnv.set(env)
       Thread.currentThread.setContextClassLoader(replClassLoader)