You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/12/20 00:24:55 UTC

spark git commit: [SPARK-4896] don’t redundantly overwrite executor JAR deps

Repository: spark
Updated Branches:
  refs/heads/master cdb2c645a -> 7981f9697


[SPARK-4896] don’t redundantly overwrite executor JAR deps

Author: Ryan Williams <ry...@gmail.com>

Closes #2848 from ryan-williams/fetch-file and squashes the following commits:

c14daff [Ryan Williams] Fix copy that was changed to a move inadvertently
8e39c16 [Ryan Williams] code review feedback
788ed41 [Ryan Williams] don’t redundantly overwrite executor JAR deps


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7981f969
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7981f969
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7981f969

Branch: refs/heads/master
Commit: 7981f969762e77f1752ef8f86c546d4fc32a1a4f
Parents: cdb2c64
Author: Ryan Williams <ry...@gmail.com>
Authored: Fri Dec 19 15:24:41 2014 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Fri Dec 19 15:24:41 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Utils.scala     | 170 ++++++++++++-------
 1 file changed, 107 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7981f969/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d16233a..5e1cb0c 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -385,16 +385,12 @@ private[spark] object Utils extends Logging {
       } finally {
         lock.release()
       }
-      if (targetFile.exists && !Files.equal(cachedFile, targetFile)) {
-        if (conf.getBoolean("spark.files.overwrite", false)) {
-          targetFile.delete()
-          logInfo((s"File $targetFile exists and does not match contents of $url, " +
-            s"replacing it with $url"))
-        } else {
-          throw new SparkException(s"File $targetFile exists and does not match contents of $url")
-        }
-      }
-      Files.copy(cachedFile, targetFile)
+      copyFile(
+        url,
+        cachedFile,
+        targetFile,
+        conf.getBoolean("spark.files.overwrite", false)
+      )
     } else {
       doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
     }
@@ -412,6 +408,104 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Download `in` to `tempFile`, then move it to `destFile`.
+   *
+   * If `destFile` already exists:
+   *   - no-op if its contents equal those of `sourceFile`,
+   *   - throw an exception if `fileOverwrite` is false,
+   *   - attempt to overwrite it otherwise.
+   *
+   * @param url URL that `sourceFile` originated from, for logging purposes.
+   * @param in InputStream to download.
+   * @param tempFile File path to download `in` to.
+   * @param destFile File path to move `tempFile` to.
+   * @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
+   *                      `sourceFile`
+   */
+  private def downloadFile(
+      url: String,
+      in: InputStream,
+      tempFile: File,
+      destFile: File,
+      fileOverwrite: Boolean): Unit = {
+
+    try {
+      val out = new FileOutputStream(tempFile)
+      Utils.copyStream(in, out, closeStreams = true)
+      copyFile(url, tempFile, destFile, fileOverwrite, removeSourceFile = true)
+    } finally {
+      // Catch-all for the couple of cases where for some reason we didn't move `tempFile` to
+      // `destFile`.
+      if (tempFile.exists()) {
+        tempFile.delete()
+      }
+    }
+  }
+
+  /**
+   * Copy `sourceFile` to `destFile`.
+   *
+   * If `destFile` already exists:
+   *   - no-op if its contents equal those of `sourceFile`,
+   *   - throw an exception if `fileOverwrite` is false,
+   *   - attempt to overwrite it otherwise.
+   *
+   * @param url URL that `sourceFile` originated from, for logging purposes.
+   * @param sourceFile File path to copy/move from.
+   * @param destFile File path to copy/move to.
+   * @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
+   *                      `sourceFile`
+   * @param removeSourceFile Whether to remove `sourceFile` after / as part of moving/copying it to
+   *                         `destFile`.
+   */
+  private def copyFile(
+      url: String,
+      sourceFile: File,
+      destFile: File,
+      fileOverwrite: Boolean,
+      removeSourceFile: Boolean = false): Unit = {
+
+    if (destFile.exists) {
+      if (!Files.equal(sourceFile, destFile)) {
+        if (fileOverwrite) {
+          logInfo(
+            s"File $destFile exists and does not match contents of $url, replacing it with $url"
+          )
+          if (!destFile.delete()) {
+            throw new SparkException(
+              "Failed to delete %s while attempting to overwrite it with %s".format(
+                destFile.getAbsolutePath,
+                sourceFile.getAbsolutePath
+              )
+            )
+          }
+        } else {
+          throw new SparkException(
+            s"File $destFile exists and does not match contents of $url")
+        }
+      } else {
+        // Do nothing if the file contents are the same, i.e. this file has been copied
+        // previously.
+        logInfo(
+          "%s has been previously copied to %s".format(
+            sourceFile.getAbsolutePath,
+            destFile.getAbsolutePath
+          )
+        )
+        return
+      }
+    }
+
+    // The file does not exist in the target directory. Copy or move it there.
+    if (removeSourceFile) {
+      Files.move(sourceFile, destFile)
+    } else {
+      logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}")
+      Files.copy(sourceFile, destFile)
+    }
+  }
+
+  /**
    * Download a file to target directory. Supports fetching the file in a variety of ways,
    * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
    *
@@ -449,67 +543,17 @@ private[spark] object Utils extends Logging {
         uc.setReadTimeout(timeout)
         uc.connect()
         val in = uc.getInputStream()
-        val out = new FileOutputStream(tempFile)
-        Utils.copyStream(in, out, closeStreams = true)
-        if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
-          if (fileOverwrite) {
-            targetFile.delete()
-            logInfo(("File %s exists and does not match contents of %s, " +
-              "replacing it with %s").format(targetFile, url, url))
-          } else {
-            tempFile.delete()
-            throw new SparkException(
-              "File " + targetFile + " exists and does not match contents of" + " " + url)
-          }
-        }
-        Files.move(tempFile, targetFile)
+        downloadFile(url, in, tempFile, targetFile, fileOverwrite)
       case "file" =>
         // In the case of a local file, copy the local file to the target directory.
         // Note the difference between uri vs url.
         val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
-        var shouldCopy = true
-        if (targetFile.exists) {
-          if (!Files.equal(sourceFile, targetFile)) {
-            if (fileOverwrite) {
-              targetFile.delete()
-              logInfo(("File %s exists and does not match contents of %s, " +
-                "replacing it with %s").format(targetFile, url, url))
-            } else {
-              throw new SparkException(
-                "File " + targetFile + " exists and does not match contents of" + " " + url)
-            }
-          } else {
-            // Do nothing if the file contents are the same, i.e. this file has been copied
-            // previously.
-            logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
-              + targetFile.getAbsolutePath)
-            shouldCopy = false
-          }
-        }
-
-        if (shouldCopy) {
-          // The file does not exist in the target directory. Copy it there.
-          logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
-          Files.copy(sourceFile, targetFile)
-        }
+        copyFile(url, sourceFile, targetFile, fileOverwrite)
       case _ =>
         // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
         val fs = getHadoopFileSystem(uri, hadoopConf)
         val in = fs.open(new Path(uri))
-        val out = new FileOutputStream(tempFile)
-        Utils.copyStream(in, out, closeStreams = true)
-        if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
-          if (fileOverwrite) {
-            targetFile.delete()
-            logInfo(("File %s exists and does not match contents of %s, " +
-              "replacing it with %s").format(targetFile, url, url))
-          } else {
-            tempFile.delete()
-            throw new SparkException(
-              "File " + targetFile + " exists and does not match contents of" + " " + url)
-          }
-        }
-        Files.move(tempFile, targetFile)
+        downloadFile(url, in, tempFile, targetFile, fileOverwrite)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org