You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/04/05 02:16:45 UTC

git commit: [SPARK-1198] Allow pipes tasks to run in different sub-directories

Repository: spark
Updated Branches:
  refs/heads/master a02b535d5 -> 198892fe8


[SPARK-1198] Allow pipes tasks to run in different sub-directories

This works as is on Linux/Mac/etc but doesn't cover working on Windows.  In here I use ln -sf for symlinks. Putting this up for comments on that. Do we want to create perhaps some classes for doing shell commands - Linux vs Windows.  Is there some other way we want to do this?   I assume we are still supporting jdk1.6?

Also should I update the Java API for pipes to allow this parameter?

Author: Thomas Graves <tg...@apache.org>

Closes #128 from tgravescs/SPARK1198 and squashes the following commits:

abc1289 [Thomas Graves] remove extra tag in pom file
ba23fc0 [Thomas Graves] Add support for symlink on windows, remove commons-io usage
da4b221 [Thomas Graves] Merge branch 'master' of https://github.com/tgravescs/spark into SPARK1198
61be271 [Thomas Graves] Fix file name filter
6b783bd [Thomas Graves] style fixes
1ab49ca [Thomas Graves] Add support for running pipe tasks is separate directories


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/198892fe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/198892fe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/198892fe

Branch: refs/heads/master
Commit: 198892fe8d39a2fad585fa2a7579d8b478456c33
Parents: a02b535
Author: Thomas Graves <tg...@apache.org>
Authored: Fri Apr 4 17:16:31 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Fri Apr 4 17:16:31 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/PipedRDD.scala   | 64 +++++++++++++++++++-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  7 ++-
 .../scala/org/apache/spark/util/Utils.scala     | 45 +++++++++++++-
 .../scala/org/apache/spark/PipedRDDSuite.scala  | 28 ++++++++-
 4 files changed, 137 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/198892fe/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 4250a9d..41ae0fe 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.rdd
 
+import java.io.File
+import java.io.FilenameFilter
+import java.io.IOException
 import java.io.PrintWriter
 import java.util.StringTokenizer
 
@@ -27,6 +30,7 @@ import scala.io.Source
 import scala.reflect.ClassTag
 
 import org.apache.spark.{Partition, SparkEnv, TaskContext}
+import org.apache.spark.util.Utils
 
 
 /**
@@ -38,7 +42,8 @@ class PipedRDD[T: ClassTag](
     command: Seq[String],
     envVars: Map[String, String],
     printPipeContext: (String => Unit) => Unit,
-    printRDDElement: (T, String => Unit) => Unit)
+    printRDDElement: (T, String => Unit) => Unit,
+    separateWorkingDir: Boolean)
   extends RDD[String](prev) {
 
   // Similar to Runtime.exec(), if we are given a single string, split it into words
@@ -48,12 +53,24 @@ class PipedRDD[T: ClassTag](
       command: String,
       envVars: Map[String, String] = Map(),
       printPipeContext: (String => Unit) => Unit = null,
-      printRDDElement: (T, String => Unit) => Unit = null) =
-    this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement)
+      printRDDElement: (T, String => Unit) => Unit = null,
+      separateWorkingDir: Boolean = false) =
+    this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
+      separateWorkingDir)
 
 
   override def getPartitions: Array[Partition] = firstParent[T].partitions
 
+  /**
+   * A FilenameFilter that accepts anything that isn't equal to the name passed in.
+   * @param name of file or directory to leave out
+   */
+  class NotEqualsFileNameFilter(filterName: String) extends FilenameFilter {
+    def accept(dir: File, name: String): Boolean = {
+      !name.equals(filterName)
+    }
+  }
+
   override def compute(split: Partition, context: TaskContext): Iterator[String] = {
     val pb = new ProcessBuilder(command)
     // Add the environmental variables to the process.
@@ -67,6 +84,38 @@ class PipedRDD[T: ClassTag](
       currentEnvVars.putAll(hadoopSplit.getPipeEnvVars())
     }
 
+    // When spark.worker.separated.working.directory option is turned on, each
+    // task will be run in separate directory. This should be resolve file
+    // access conflict issue
+    val taskDirectory = "./tasks/" + java.util.UUID.randomUUID.toString
+    var workInTaskDirectory = false
+    logDebug("taskDirectory = " + taskDirectory)
+    if (separateWorkingDir == true) {
+      val currentDir = new File(".")
+      logDebug("currentDir = " + currentDir.getAbsolutePath())
+      val taskDirFile = new File(taskDirectory)
+      taskDirFile.mkdirs()
+
+      try {
+        val tasksDirFilter = new NotEqualsFileNameFilter("tasks")
+
+        // Need to add symlinks to jars, files, and directories.  On Yarn we could have
+        // directories and other files not known to the SparkContext that were added via the
+        // Hadoop distributed cache.  We also don't want to symlink to the /tasks directories we
+        // are creating here.
+        for (file <- currentDir.list(tasksDirFilter)) {
+          val fileWithDir = new File(currentDir, file)
+          Utils.symlink(new File(fileWithDir.getAbsolutePath()),
+            new File(taskDirectory + "/" + fileWithDir.getName()))
+        }
+        pb.directory(taskDirFile)
+        workInTaskDirectory = true
+      } catch {
+        case e: Exception => logError("Unable to setup task working directory: " + e.getMessage +
+          " (" + taskDirectory + ")")
+      }
+    }
+
     val proc = pb.start()
     val env = SparkEnv.get
 
@@ -112,6 +161,15 @@ class PipedRDD[T: ClassTag](
           if (exitStatus != 0) {
             throw new Exception("Subprocess exited with status " + exitStatus)
           }
+
+          // cleanup task working directory if used
+          if (workInTaskDirectory == true) {
+            scala.util.control.Exception.ignoring(classOf[IOException]) {
+              Utils.deleteRecursively(new File(taskDirectory))
+            }
+            logDebug("Removed task working directory " + taskDirectory)
+          }
+
           false
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/198892fe/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ce2b8ac..08c42c5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -481,16 +481,19 @@ abstract class RDD[T: ClassTag](
    *                        instead of constructing a huge String to concat all the elements:
    *                        def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
    *                          for (e <- record._2){f(e)}
+   * @param separateWorkingDir Use separate working directories for each task.
    * @return the result RDD
    */
   def pipe(
       command: Seq[String],
       env: Map[String, String] = Map(),
       printPipeContext: (String => Unit) => Unit = null,
-      printRDDElement: (T, String => Unit) => Unit = null): RDD[String] = {
+      printRDDElement: (T, String => Unit) => Unit = null,
+      separateWorkingDir: Boolean = false): RDD[String] = {
     new PipedRDD(this, command, env,
       if (printPipeContext ne null) sc.clean(printPipeContext) else null,
-      if (printRDDElement ne null) sc.clean(printRDDElement) else null)
+      if (printRDDElement ne null) sc.clean(printRDDElement) else null,
+      separateWorkingDir)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/198892fe/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 62ee704..737b765 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -26,6 +26,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
 import scala.collection.JavaConversions._
 import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.SortedSet
 import scala.io.Source
 import scala.reflect.ClassTag
 
@@ -43,6 +44,8 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
  */
 private[spark] object Utils extends Logging {
 
+  val osName = System.getProperty("os.name")
+
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream()
@@ -521,9 +524,10 @@ private[spark] object Utils extends Logging {
 
   /**
    * Delete a file or directory and its contents recursively.
+   * Don't follow directories if they are symlinks.
    */
   def deleteRecursively(file: File) {
-    if (file.isDirectory) {
+    if ((file.isDirectory) && !isSymlink(file)) {
       for (child <- listFilesSafely(file)) {
         deleteRecursively(child)
       }
@@ -537,6 +541,25 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Check to see if file is a symbolic link. 
+   */
+  def isSymlink(file: File): Boolean = {
+    if (file == null) throw new NullPointerException("File must not be null")
+    if (osName.startsWith("Windows")) return false
+    val fileInCanonicalDir = if (file.getParent() == null) {
+      file
+    } else {
+      new File(file.getParentFile().getCanonicalFile(), file.getName())
+    }
+
+    if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) {
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  /**
    * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
    */
   def memoryStringToMb(str: String): Int = {
@@ -898,6 +921,26 @@ private[spark] object Utils extends Logging {
     count
   }
 
+  /**
+   * Creates a symlink. Note jdk1.7 has Files.createSymbolicLink but not used here
+   * for jdk1.6 support.  Supports windows by doing copy, everything else uses "ln -sf".
+   * @param src absolute path to the source
+   * @param dst relative path for the destination
+   */
+  def symlink(src: File, dst: File) {
+    if (!src.isAbsolute()) {
+      throw new IOException("Source must be absolute")
+    }
+    if (dst.isAbsolute()) {
+      throw new IOException("Destination must be relative")
+    }
+    val linkCmd = if (osName.startsWith("Windows")) "copy" else "ln -sf"
+    import scala.sys.process._
+    (linkCmd + " " + src.getAbsolutePath() + " " + dst.getPath()) lines_! ProcessLogger(line =>
+       (logInfo(line)))
+  }
+
+
   /** Return the class name of the given object, removing all dollar signs */
   def getFormattedClassName(obj: AnyRef) = {
     obj.getClass.getSimpleName.replace("$", "")

http://git-wip-us.apache.org/repos/asf/spark/blob/198892fe/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
index 6e7fd55..627e9b5 100644
--- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
@@ -17,8 +17,11 @@
 
 package org.apache.spark
 
-import org.scalatest.FunSuite
+import java.io.File
+
+import com.google.common.io.Files
 
+import org.scalatest.FunSuite
 
 import org.apache.spark.rdd.{HadoopRDD, PipedRDD, HadoopPartition}
 import org.apache.hadoop.mapred.{JobConf, TextInputFormat, FileSplit}
@@ -126,6 +129,29 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
     }
   }
 
+  test("basic pipe with separate working directory") {
+    if (testCommandAvailable("cat")) {
+      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+      val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)
+      val c = piped.collect()
+      assert(c.size === 4)
+      assert(c(0) === "1")
+      assert(c(1) === "2")
+      assert(c(2) === "3")
+      assert(c(3) === "4")
+      val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
+      val collectPwd = pipedPwd.collect()
+      assert(collectPwd(0).contains("tasks/"))
+      val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true).collect()
+      // make sure symlinks were created
+      assert(pipedLs.length > 0)
+      // clean up top level tasks directory
+      new File("tasks").delete()
+    } else {
+      assert(true)
+    }
+  }
+
   test("test pipe exports map_input_file") {
     testExportInputFile("map_input_file")
   }