You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/01/21 23:42:47 UTC
spark git commit: Make sure only owner can read / write to
directories created for the job.
Repository: spark
Updated Branches:
refs/heads/branch-1.2 bb8bd11da -> 079b3be81
Make sure only owner can read / write to directories created for the job.
Whenever a directory is created by the utility method, immediately restrict
its permissions so that only the owner has access to its contents.
Signed-off-by: Josh Rosen <jo...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/079b3be8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/079b3be8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/079b3be8
Branch: refs/heads/branch-1.2
Commit: 079b3be81264b446f927739f26ed9f426611d83f
Parents: bb8bd11
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Jan 21 14:38:14 2015 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Wed Jan 21 14:38:14 2015 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/HttpFileServer.scala | 2 +-
.../main/scala/org/apache/spark/SparkEnv.scala | 2 +-
.../apache/spark/broadcast/HttpBroadcast.scala | 2 +-
.../apache/spark/storage/DiskBlockManager.scala | 36 ++-------
.../scala/org/apache/spark/util/Utils.scala | 77 ++++++++++++++------
python/pyspark/context.py | 3 +-
6 files changed, 68 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/079b3be8/core/src/main/scala/org/apache/spark/HttpFileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 677c5e0..3f33332 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -36,7 +36,7 @@ private[spark] class HttpFileServer(
var serverUri : String = null
def initialize() {
- baseDir = Utils.createTempDir()
+ baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
fileDir = new File(baseDir, "files")
jarDir = new File(baseDir, "jars")
fileDir.mkdir()
http://git-wip-us.apache.org/repos/asf/spark/blob/079b3be8/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 48a9d98..e6ebbff 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -339,7 +339,7 @@ object SparkEnv extends Logging {
// this is a temporary directory; in distributed mode, this is the executor's current working
// directory.
val sparkFilesDir: String = if (isDriver) {
- Utils.createTempDir().getAbsolutePath
+ Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
} else {
"."
}
http://git-wip-us.apache.org/repos/asf/spark/blob/079b3be8/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 31d6958..ea98051 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -151,7 +151,7 @@ private[broadcast] object HttpBroadcast extends Logging {
}
private def createServer(conf: SparkConf) {
- broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
+ broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf), "broadcast")
val broadcastPort = conf.getInt("spark.broadcast.port", 0)
server =
new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
http://git-wip-us.apache.org/repos/asf/spark/blob/079b3be8/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index d79ed76..ffaac4b 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -17,9 +17,8 @@
package org.apache.spark.storage
-import java.io.File
-import java.text.SimpleDateFormat
-import java.util.{Date, Random, UUID}
+import java.util.UUID
+import java.io.{IOException, File}
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.executor.ExecutorExitCode
@@ -37,7 +36,6 @@ import org.apache.spark.util.Utils
private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkConf)
extends Logging {
- private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
private[spark]
val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64)
@@ -121,33 +119,15 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
private def createLocalDirs(conf: SparkConf): Array[File] = {
- val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir =>
- var foundLocalDir = false
- var localDir: File = null
- var localDirId: String = null
- var tries = 0
- val rand = new Random()
- while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) {
- tries += 1
- try {
- localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
- localDir = new File(rootDir, s"spark-local-$localDirId")
- if (!localDir.exists) {
- foundLocalDir = localDir.mkdirs()
- }
- } catch {
- case e: Exception =>
- logWarning(s"Attempt $tries to create local dir $localDir failed", e)
- }
- }
- if (!foundLocalDir) {
- logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir." +
- " Ignoring this directory.")
- None
- } else {
+ try {
+ val localDir = Utils.createDirectory(rootDir, "blockmgr")
logInfo(s"Created local directory at $localDir")
Some(localDir)
+ } catch {
+ case e: IOException =>
+ logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
+ None
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/079b3be8/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index cdb322d..8d230ff 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -60,6 +60,8 @@ private[spark] object CallSite {
private[spark] object Utils extends Logging {
val random = new Random()
+ private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
+
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -247,12 +249,27 @@ private[spark] object Utils extends Logging {
}
/**
+ * JDK equivalent of `chmod 700 file`.
+ *
+ * @param file the file whose permissions will be modified
+ * @return true if the permissions were successfully changed, false otherwise.
+ */
+ def chmod700(file: File): Boolean = {
+ file.setReadable(false, false) &&
+ file.setReadable(true, true) &&
+ file.setWritable(false, false) &&
+ file.setWritable(true, true) &&
+ file.setExecutable(false, false) &&
+ file.setExecutable(true, true)
+ }
+
+ /**
* Create a directory inside the given parent directory. The directory is guaranteed to be
* newly created, and is not marked for automatic deletion.
*/
- def createDirectory(root: String): File = {
+ def createDirectory(root: String, namePrefix: String = "spark"): File = {
var attempts = 0
- val maxAttempts = 10
+ val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
var dir: File = null
while (dir == null) {
attempts += 1
@@ -264,6 +281,11 @@ private[spark] object Utils extends Logging {
dir = new File(root, "spark-" + UUID.randomUUID.toString)
if (dir.exists() || !dir.mkdirs()) {
dir = null
+ } else {
+ if (!chmod700(dir)) {
+ dir.delete()
+ dir = null
+ }
}
} catch { case e: SecurityException => dir = null; }
}
@@ -275,8 +297,10 @@ private[spark] object Utils extends Logging {
* Create a temporary directory inside the given parent directory. The directory will be
* automatically deleted when the VM shuts down.
*/
- def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
- val dir = createDirectory(root)
+ def createTempDir(
+ root: String = System.getProperty("java.io.tmpdir"),
+ namePrefix: String = "spark"): File = {
+ val dir = createDirectory(root, namePrefix)
registerShutdownDeleteDir(dir)
dir
}
@@ -599,26 +623,35 @@ private[spark] object Utils extends Logging {
* If no directories could be created, this will return an empty list.
*/
private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
- val confValue = if (isRunningInYarnContainer(conf)) {
+ if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
- // to what Yarn on this system said was available.
- getYarnLocalDirs(conf)
+ // to what Yarn on this system said was available. Note this assumes that Yarn has
+ // created the directories already, and that they are secured so that only the
+ // user has access to them.
+ getYarnLocalDirs(conf).split(",")
} else {
- Option(conf.getenv("SPARK_LOCAL_DIRS")).getOrElse(
- conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
- }
- val rootDirs = confValue.split(',')
- logDebug(s"Getting/creating local root dirs at '$confValue'")
-
- rootDirs.flatMap { rootDir =>
- val localDir: File = new File(rootDir)
- val foundLocalDir = localDir.exists || localDir.mkdirs()
- if (!foundLocalDir) {
- logError(s"Failed to create local root dir in $rootDir. Ignoring this directory.")
- None
- } else {
- Some(rootDir)
- }
+ // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
+ // configuration to point to a secure directory. So create a subdirectory with restricted
+ // permissions under each listed directory.
+ Option(conf.getenv("SPARK_LOCAL_DIRS"))
+ .getOrElse(conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ .split(",")
+ .flatMap { root =>
+ try {
+ val rootDir = new File(root)
+ if (rootDir.exists || rootDir.mkdirs()) {
+ Some(createDirectory(root).getAbsolutePath())
+ } else {
+ logError(s"Failed to create dir in $root. Ignoring this directory.")
+ None
+ }
+ } catch {
+ case e: IOException =>
+ logError(s"Failed to create local root dir in $root. Ignoring this directory.")
+ None
+ }
+ }
+ .toArray
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/079b3be8/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 3935413..b5c2421 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -189,7 +189,8 @@ class SparkContext(object):
# Create a temporary directory inside spark.local.dir:
local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
self._temp_dir = \
- self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()
+ self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir, "pyspark") \
+ .getAbsolutePath()
# profiling stats collected for each PythonRDD
self._profile_stats = []
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org