You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/01/21 23:42:47 UTC

spark git commit: Make sure only owner can read / write to directories created for the job.

Repository: spark
Updated Branches:
  refs/heads/branch-1.2 bb8bd11da -> 079b3be81


Make sure only owner can read / write to directories created for the job.

Whenever a directory is created by the utility method, immediately restrict
its permissions so that only the owner has access to its contents.

Signed-off-by: Josh Rosen <jo...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/079b3be8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/079b3be8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/079b3be8

Branch: refs/heads/branch-1.2
Commit: 079b3be81264b446f927739f26ed9f426611d83f
Parents: bb8bd11
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Jan 21 14:38:14 2015 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Wed Jan 21 14:38:14 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/HttpFileServer.scala |  2 +-
 .../main/scala/org/apache/spark/SparkEnv.scala  |  2 +-
 .../apache/spark/broadcast/HttpBroadcast.scala  |  2 +-
 .../apache/spark/storage/DiskBlockManager.scala | 36 ++-------
 .../scala/org/apache/spark/util/Utils.scala     | 77 ++++++++++++++------
 python/pyspark/context.py                       |  3 +-
 6 files changed, 68 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/079b3be8/core/src/main/scala/org/apache/spark/HttpFileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 677c5e0..3f33332 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -36,7 +36,7 @@ private[spark] class HttpFileServer(
   var serverUri : String = null
 
   def initialize() {
-    baseDir = Utils.createTempDir()
+    baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
     fileDir = new File(baseDir, "files")
     jarDir = new File(baseDir, "jars")
     fileDir.mkdir()

http://git-wip-us.apache.org/repos/asf/spark/blob/079b3be8/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 48a9d98..e6ebbff 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -339,7 +339,7 @@ object SparkEnv extends Logging {
     // this is a temporary directory; in distributed mode, this is the executor's current working
     // directory.
     val sparkFilesDir: String = if (isDriver) {
-      Utils.createTempDir().getAbsolutePath
+      Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
     } else {
       "."
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/079b3be8/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 31d6958..ea98051 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -151,7 +151,7 @@ private[broadcast] object HttpBroadcast extends Logging {
   }
 
   private def createServer(conf: SparkConf) {
-    broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
+    broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf), "broadcast")
     val broadcastPort = conf.getInt("spark.broadcast.port", 0)
     server =
       new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")

http://git-wip-us.apache.org/repos/asf/spark/blob/079b3be8/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index d79ed76..ffaac4b 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -17,9 +17,8 @@
 
 package org.apache.spark.storage
 
-import java.io.File
-import java.text.SimpleDateFormat
-import java.util.{Date, Random, UUID}
+import java.util.UUID
+import java.io.{IOException, File}
 
 import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.executor.ExecutorExitCode
@@ -37,7 +36,6 @@ import org.apache.spark.util.Utils
 private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkConf)
   extends Logging {
 
-  private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
   private[spark]
   val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64)
 
@@ -121,33 +119,15 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
   }
 
   private def createLocalDirs(conf: SparkConf): Array[File] = {
-    val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
     Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir =>
-      var foundLocalDir = false
-      var localDir: File = null
-      var localDirId: String = null
-      var tries = 0
-      val rand = new Random()
-      while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) {
-        tries += 1
-        try {
-          localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
-          localDir = new File(rootDir, s"spark-local-$localDirId")
-          if (!localDir.exists) {
-            foundLocalDir = localDir.mkdirs()
-          }
-        } catch {
-          case e: Exception =>
-            logWarning(s"Attempt $tries to create local dir $localDir failed", e)
-        }
-      }
-      if (!foundLocalDir) {
-        logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir." +
-                  " Ignoring this directory.")
-        None
-      } else {
+      try {
+        val localDir = Utils.createDirectory(rootDir, "blockmgr")
         logInfo(s"Created local directory at $localDir")
         Some(localDir)
+      } catch {
+        case e: IOException =>
+          logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
+          None
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/079b3be8/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index cdb322d..8d230ff 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -60,6 +60,8 @@ private[spark] object CallSite {
 private[spark] object Utils extends Logging {
   val random = new Random()
 
+  private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
+
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream()
@@ -247,12 +249,27 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * JDK equivalent of `chmod 700 file`.
+   *
+   * @param file the file whose permissions will be modified
+   * @return true if the permissions were successfully changed, false otherwise.
+   */
+  def chmod700(file: File): Boolean = {
+    file.setReadable(false, false) &&
+    file.setReadable(true, true) &&
+    file.setWritable(false, false) &&
+    file.setWritable(true, true) &&
+    file.setExecutable(false, false) &&
+    file.setExecutable(true, true)
+  }
+
+  /**
    * Create a directory inside the given parent directory. The directory is guaranteed to be
    * newly created, and is not marked for automatic deletion.
    */
-  def createDirectory(root: String): File = {
+  def createDirectory(root: String, namePrefix: String = "spark"): File = {
     var attempts = 0
-    val maxAttempts = 10
+    val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
     var dir: File = null
     while (dir == null) {
       attempts += 1
@@ -264,6 +281,11 @@ private[spark] object Utils extends Logging {
         dir = new File(root, "spark-" + UUID.randomUUID.toString)
         if (dir.exists() || !dir.mkdirs()) {
           dir = null
+        } else {
+          if (!chmod700(dir)) {
+            dir.delete()
+            dir = null
+          }
         }
       } catch { case e: SecurityException => dir = null; }
     }
@@ -275,8 +297,10 @@ private[spark] object Utils extends Logging {
    * Create a temporary directory inside the given parent directory. The directory will be
    * automatically deleted when the VM shuts down.
    */
-  def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
-    val dir = createDirectory(root)
+  def createTempDir(
+      root: String = System.getProperty("java.io.tmpdir"),
+      namePrefix: String = "spark"): File = {
+    val dir = createDirectory(root, namePrefix)
     registerShutdownDeleteDir(dir)
     dir
   }
@@ -599,26 +623,35 @@ private[spark] object Utils extends Logging {
    * If no directories could be created, this will return an empty list.
    */
   private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
-    val confValue = if (isRunningInYarnContainer(conf)) {
+    if (isRunningInYarnContainer(conf)) {
       // If we are in yarn mode, systems can have different disk layouts so we must set it
-      // to what Yarn on this system said was available.
-      getYarnLocalDirs(conf)
+      // to what Yarn on this system said was available. Note this assumes that Yarn has
+      // created the directories already, and that they are secured so that only the
+      // user has access to them.
+      getYarnLocalDirs(conf).split(",")
     } else {
-      Option(conf.getenv("SPARK_LOCAL_DIRS")).getOrElse(
-        conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
-    }
-    val rootDirs = confValue.split(',')
-    logDebug(s"Getting/creating local root dirs at '$confValue'")
-
-    rootDirs.flatMap { rootDir =>
-      val localDir: File = new File(rootDir)
-      val foundLocalDir = localDir.exists || localDir.mkdirs()
-      if (!foundLocalDir) {
-        logError(s"Failed to create local root dir in $rootDir.  Ignoring this directory.")
-        None
-      } else {
-        Some(rootDir)
-      }
+      // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
+      // configuration to point to a secure directory. So create a subdirectory with restricted
+      // permissions under each listed directory.
+      Option(conf.getenv("SPARK_LOCAL_DIRS"))
+        .getOrElse(conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
+        .split(",")
+        .flatMap { root =>
+          try {
+            val rootDir = new File(root)
+            if (rootDir.exists || rootDir.mkdirs()) {
+              Some(createDirectory(root).getAbsolutePath())
+            } else {
+              logError(s"Failed to create dir in $root. Ignoring this directory.")
+              None
+            }
+          } catch {
+            case e: IOException =>
+            logError(s"Failed to create local root dir in $root. Ignoring this directory.")
+            None
+          }
+        }
+        .toArray
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/079b3be8/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 3935413..b5c2421 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -189,7 +189,8 @@ class SparkContext(object):
         # Create a temporary directory inside spark.local.dir:
         local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
         self._temp_dir = \
-            self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()
+            self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir, "pyspark") \
+                .getAbsolutePath()
 
         # profiling stats collected for each PythonRDD
         self._profile_stats = []


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org