You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/12/01 08:34:20 UTC

[1/2] spark git commit: [SPARK-26180][CORE][TEST] Reuse withTempDir function to the SparkCore test case

Repository: spark
Updated Branches:
  refs/heads/master 2f6e88fec -> 327ac83f5


http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index f5e912b..901a724 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -295,31 +295,30 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
   private val workerConf = new SparkConf()
 
   def testOffsetBytes(isCompressed: Boolean): Unit = {
-    val tmpDir2 = Utils.createTempDir()
-    val suffix = getSuffix(isCompressed)
-    val f1Path = tmpDir2 + "/f1" + suffix
-    writeLogFile(f1Path, "1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8))
-    val f1Length = Utils.getFileLength(new File(f1Path), workerConf)
+    withTempDir { tmpDir2 =>
+      val suffix = getSuffix(isCompressed)
+      val f1Path = tmpDir2 + "/f1" + suffix
+      writeLogFile(f1Path, "1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8))
+      val f1Length = Utils.getFileLength(new File(f1Path), workerConf)
 
-    // Read first few bytes
-    assert(Utils.offsetBytes(f1Path, f1Length, 0, 5) === "1\n2\n3")
+      // Read first few bytes
+      assert(Utils.offsetBytes(f1Path, f1Length, 0, 5) === "1\n2\n3")
 
-    // Read some middle bytes
-    assert(Utils.offsetBytes(f1Path, f1Length, 4, 11) === "3\n4\n5\n6")
+      // Read some middle bytes
+      assert(Utils.offsetBytes(f1Path, f1Length, 4, 11) === "3\n4\n5\n6")
 
-    // Read last few bytes
-    assert(Utils.offsetBytes(f1Path, f1Length, 12, 18) === "7\n8\n9\n")
+      // Read last few bytes
+      assert(Utils.offsetBytes(f1Path, f1Length, 12, 18) === "7\n8\n9\n")
 
-    // Read some nonexistent bytes in the beginning
-    assert(Utils.offsetBytes(f1Path, f1Length, -5, 5) === "1\n2\n3")
+      // Read some nonexistent bytes in the beginning
+      assert(Utils.offsetBytes(f1Path, f1Length, -5, 5) === "1\n2\n3")
 
-    // Read some nonexistent bytes at the end
-    assert(Utils.offsetBytes(f1Path, f1Length, 12, 22) === "7\n8\n9\n")
+      // Read some nonexistent bytes at the end
+      assert(Utils.offsetBytes(f1Path, f1Length, 12, 22) === "7\n8\n9\n")
 
-    // Read some nonexistent bytes on both ends
-    assert(Utils.offsetBytes(f1Path, f1Length, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
-
-    Utils.deleteRecursively(tmpDir2)
+      // Read some nonexistent bytes on both ends
+      assert(Utils.offsetBytes(f1Path, f1Length, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
+    }
   }
 
   test("reading offset bytes of a file") {
@@ -331,41 +330,41 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
   }
 
   def testOffsetBytesMultipleFiles(isCompressed: Boolean): Unit = {
-    val tmpDir = Utils.createTempDir()
-    val suffix = getSuffix(isCompressed)
-    val files = (1 to 3).map(i => new File(tmpDir, i.toString + suffix)) :+ new File(tmpDir, "4")
-    writeLogFile(files(0).getAbsolutePath, "0123456789".getBytes(StandardCharsets.UTF_8))
-    writeLogFile(files(1).getAbsolutePath, "abcdefghij".getBytes(StandardCharsets.UTF_8))
-    writeLogFile(files(2).getAbsolutePath, "ABCDEFGHIJ".getBytes(StandardCharsets.UTF_8))
-    writeLogFile(files(3).getAbsolutePath, "9876543210".getBytes(StandardCharsets.UTF_8))
-    val fileLengths = files.map(Utils.getFileLength(_, workerConf))
-
-    // Read first few bytes in the 1st file
-    assert(Utils.offsetBytes(files, fileLengths, 0, 5) === "01234")
+    withTempDir { tmpDir =>
+      val suffix = getSuffix(isCompressed)
+      val files = (1 to 3).map(i =>
+        new File(tmpDir, i.toString + suffix)) :+ new File(tmpDir, "4")
+      writeLogFile(files(0).getAbsolutePath, "0123456789".getBytes(StandardCharsets.UTF_8))
+      writeLogFile(files(1).getAbsolutePath, "abcdefghij".getBytes(StandardCharsets.UTF_8))
+      writeLogFile(files(2).getAbsolutePath, "ABCDEFGHIJ".getBytes(StandardCharsets.UTF_8))
+      writeLogFile(files(3).getAbsolutePath, "9876543210".getBytes(StandardCharsets.UTF_8))
+      val fileLengths = files.map(Utils.getFileLength(_, workerConf))
 
-    // Read bytes within the 1st file
-    assert(Utils.offsetBytes(files, fileLengths, 5, 8) === "567")
+      // Read first few bytes in the 1st file
+      assert(Utils.offsetBytes(files, fileLengths, 0, 5) === "01234")
 
-    // Read bytes across 1st and 2nd file
-    assert(Utils.offsetBytes(files, fileLengths, 8, 18) === "89abcdefgh")
+      // Read bytes within the 1st file
+      assert(Utils.offsetBytes(files, fileLengths, 5, 8) === "567")
 
-    // Read bytes across 1st, 2nd and 3rd file
-    assert(Utils.offsetBytes(files, fileLengths, 5, 24) === "56789abcdefghijABCD")
+      // Read bytes across 1st and 2nd file
+      assert(Utils.offsetBytes(files, fileLengths, 8, 18) === "89abcdefgh")
 
-    // Read bytes across 3rd and 4th file
-    assert(Utils.offsetBytes(files, fileLengths, 25, 35) === "FGHIJ98765")
+      // Read bytes across 1st, 2nd and 3rd file
+      assert(Utils.offsetBytes(files, fileLengths, 5, 24) === "56789abcdefghijABCD")
 
-    // Read some nonexistent bytes in the beginning
-    assert(Utils.offsetBytes(files, fileLengths, -5, 18) === "0123456789abcdefgh")
+      // Read bytes across 3rd and 4th file
+      assert(Utils.offsetBytes(files, fileLengths, 25, 35) === "FGHIJ98765")
 
-    // Read some nonexistent bytes at the end
-    assert(Utils.offsetBytes(files, fileLengths, 18, 45) === "ijABCDEFGHIJ9876543210")
+      // Read some nonexistent bytes in the beginning
+      assert(Utils.offsetBytes(files, fileLengths, -5, 18) === "0123456789abcdefgh")
 
-    // Read some nonexistent bytes on both ends
-    assert(Utils.offsetBytes(files, fileLengths, -5, 45) ===
-      "0123456789abcdefghijABCDEFGHIJ9876543210")
+      // Read some nonexistent bytes at the end
+      assert(Utils.offsetBytes(files, fileLengths, 18, 45) === "ijABCDEFGHIJ9876543210")
 
-    Utils.deleteRecursively(tmpDir)
+      // Read some nonexistent bytes on both ends
+      assert(Utils.offsetBytes(files, fileLengths, -5, 45) ===
+        "0123456789abcdefghijABCDEFGHIJ9876543210")
+    }
   }
 
   test("reading offset bytes across multiple files") {
@@ -427,27 +426,28 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
 
   test("doesDirectoryContainFilesNewerThan") {
     // create some temporary directories and files
-    val parent: File = Utils.createTempDir()
-    // The parent directory has two child directories
-    val child1: File = Utils.createTempDir(parent.getCanonicalPath)
-    val child2: File = Utils.createTempDir(parent.getCanonicalPath)
-    val child3: File = Utils.createTempDir(child1.getCanonicalPath)
-    // set the last modified time of child1 to 30 secs old
-    child1.setLastModified(System.currentTimeMillis() - (1000 * 30))
-
-    // although child1 is old, child2 is still new so return true
-    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
-
-    child2.setLastModified(System.currentTimeMillis - (1000 * 30))
-    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
-
-    parent.setLastModified(System.currentTimeMillis - (1000 * 30))
-    // although parent and its immediate children are new, child3 is still old
-    // we expect a full recursive search for new files.
-    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
-
-    child3.setLastModified(System.currentTimeMillis - (1000 * 30))
-    assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+    withTempDir { parent =>
+      // The parent directory has two child directories
+      val child1: File = Utils.createTempDir(parent.getCanonicalPath)
+      val child2: File = Utils.createTempDir(parent.getCanonicalPath)
+      val child3: File = Utils.createTempDir(child1.getCanonicalPath)
+      // set the last modified time of child1 to 30 secs old
+      child1.setLastModified(System.currentTimeMillis() - (1000 * 30))
+
+      // although child1 is old, child2 is still new so return true
+      assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+
+      child2.setLastModified(System.currentTimeMillis - (1000 * 30))
+      assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+
+      parent.setLastModified(System.currentTimeMillis - (1000 * 30))
+      // although parent and its immediate children are new, child3 is still old
+      // we expect a full recursive search for new files.
+      assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+
+      child3.setLastModified(System.currentTimeMillis - (1000 * 30))
+      assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+    }
   }
 
   test("resolveURI") {
@@ -608,9 +608,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
   }
 
   test("loading properties from file") {
-    val tmpDir = Utils.createTempDir()
-    val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir)
-    try {
+    withTempDir { tmpDir =>
+      val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir)
       System.setProperty("spark.test.fileNameLoadB", "2")
       Files.write("spark.test.fileNameLoadA true\n" +
         "spark.test.fileNameLoadB 1\n", outFile, StandardCharsets.UTF_8)
@@ -621,8 +620,6 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
       val sparkConf = new SparkConf
       assert(sparkConf.getBoolean("spark.test.fileNameLoadA", false) === true)
       assert(sparkConf.getInt("spark.test.fileNameLoadB", 1) === 2)
-    } finally {
-      Utils.deleteRecursively(tmpDir)
     }
   }
 
@@ -638,52 +635,53 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
   }
 
   test("fetch hcfs dir") {
-    val tempDir = Utils.createTempDir()
-    val sourceDir = new File(tempDir, "source-dir")
-    sourceDir.mkdir()
-    val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath)
-    val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir)
-    val targetDir = new File(tempDir, "target-dir")
-    Files.write("some text", sourceFile, StandardCharsets.UTF_8)
-
-    val path =
-      if (Utils.isWindows) {
-        new Path("file:/" + sourceDir.getAbsolutePath.replace("\\", "/"))
-      } else {
-        new Path("file://" + sourceDir.getAbsolutePath)
-      }
-    val conf = new Configuration()
-    val fs = Utils.getHadoopFileSystem(path.toString, conf)
+    withTempDir { tempDir =>
+      val sourceDir = new File(tempDir, "source-dir")
+      sourceDir.mkdir()
+      val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath)
+      val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir)
+      val targetDir = new File(tempDir, "target-dir")
+      Files.write("some text", sourceFile, StandardCharsets.UTF_8)
+
+      val path =
+        if (Utils.isWindows) {
+          new Path("file:/" + sourceDir.getAbsolutePath.replace("\\", "/"))
+        } else {
+          new Path("file://" + sourceDir.getAbsolutePath)
+        }
+      val conf = new Configuration()
+      val fs = Utils.getHadoopFileSystem(path.toString, conf)
 
-    assert(!targetDir.isDirectory())
-    Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
-    assert(targetDir.isDirectory())
+      assert(!targetDir.isDirectory())
+      Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
+      assert(targetDir.isDirectory())
 
-    // Copy again to make sure it doesn't error if the dir already exists.
-    Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
+      // Copy again to make sure it doesn't error if the dir already exists.
+      Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
 
-    val destDir = new File(targetDir, sourceDir.getName())
-    assert(destDir.isDirectory())
+      val destDir = new File(targetDir, sourceDir.getName())
+      assert(destDir.isDirectory())
 
-    val destInnerDir = new File(destDir, innerSourceDir.getName)
-    assert(destInnerDir.isDirectory())
+      val destInnerDir = new File(destDir, innerSourceDir.getName)
+      assert(destInnerDir.isDirectory())
 
-    val destInnerFile = new File(destInnerDir, sourceFile.getName)
-    assert(destInnerFile.isFile())
+      val destInnerFile = new File(destInnerDir, sourceFile.getName)
+      assert(destInnerFile.isFile())
 
-    val filePath =
-      if (Utils.isWindows) {
-        new Path("file:/" + sourceFile.getAbsolutePath.replace("\\", "/"))
-      } else {
-        new Path("file://" + sourceFile.getAbsolutePath)
-      }
-    val testFileDir = new File(tempDir, "test-filename")
-    val testFileName = "testFName"
-    val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
-    Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),
-                        conf, false, Some(testFileName))
-    val newFileName = new File(testFileDir, testFileName)
-    assert(newFileName.isFile())
+      val filePath =
+        if (Utils.isWindows) {
+          new Path("file:/" + sourceFile.getAbsolutePath.replace("\\", "/"))
+        } else {
+          new Path("file://" + sourceFile.getAbsolutePath)
+        }
+      val testFileDir = new File(tempDir, "test-filename")
+      val testFileName = "testFName"
+      val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
+      Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),
+        conf, false, Some(testFileName))
+      val newFileName = new File(testFileDir, testFileName)
+      assert(newFileName.isFile())
+    }
   }
 
   test("shutdown hook manager") {

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 2341949..85963ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -67,6 +67,17 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
   }
 
   /**
+   * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
+   * returns.
+   */
+  protected override def withTempDir(f: File => Unit): Unit = {
+    super.withTempDir { dir =>
+      f(dir)
+      waitForTasksToFinish()
+    }
+  }
+
+  /**
    * A helper function for turning off/on codegen.
    */
   protected def testWithWholeStageCodegenOnAndOff(testName: String)(f: String => Unit): Unit = {
@@ -143,43 +154,6 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
       test(name) { runOnThread() }
     }
   }
-}
-
-/**
- * Helper trait that can be extended by all external SQL test suites.
- *
- * This allows subclasses to plugin a custom `SQLContext`.
- * To use implicit methods, import `testImplicits._` instead of through the `SQLContext`.
- *
- * Subclasses should *not* create `SQLContext`s in the test suite constructor, which is
- * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM.
- */
-private[sql] trait SQLTestUtilsBase
-  extends Eventually
-  with BeforeAndAfterAll
-  with SQLTestData
-  with PlanTestBase { self: Suite =>
-
-  protected def sparkContext = spark.sparkContext
-
-  // Shorthand for running a query using our SQLContext
-  protected lazy val sql = spark.sql _
-
-  /**
-   * A helper object for importing SQL implicits.
-   *
-   * Note that the alternative of importing `spark.implicits._` is not possible here.
-   * This is because we create the `SQLContext` immediately before the first test is run,
-   * but the implicits import is needed in the constructor.
-   */
-  protected object testImplicits extends SQLImplicits {
-    protected override def _sqlContext: SQLContext = self.spark.sqlContext
-  }
-
-  protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
-    SparkSession.setActiveSession(spark)
-    super.withSQLConf(pairs: _*)(f)
-  }
 
   /**
    * Copy file in jar's resource to a temp file, then pass it to `f`.
@@ -207,21 +181,6 @@ private[sql] trait SQLTestUtilsBase
   }
 
   /**
-   * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
-   * returns.
-   *
-   * @todo Probably this method should be moved to a more general place
-   */
-  protected def withTempDir(f: File => Unit): Unit = {
-    val dir = Utils.createTempDir().getCanonicalFile
-    try f(dir) finally {
-      // wait for all tasks to finish before deleting files
-      waitForTasksToFinish()
-      Utils.deleteRecursively(dir)
-    }
-  }
-
-  /**
    * Creates the specified number of temporary directories, which is then passed to `f` and will be
    * deleted after `f` returns.
    */
@@ -233,6 +192,43 @@ private[sql] trait SQLTestUtilsBase
       files.foreach(Utils.deleteRecursively)
     }
   }
+}
+
+/**
+ * Helper trait that can be extended by all external SQL test suites.
+ *
+ * This allows subclasses to plugin a custom `SQLContext`.
+ * To use implicit methods, import `testImplicits._` instead of through the `SQLContext`.
+ *
+ * Subclasses should *not* create `SQLContext`s in the test suite constructor, which is
+ * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM.
+ */
+private[sql] trait SQLTestUtilsBase
+  extends Eventually
+  with BeforeAndAfterAll
+  with SQLTestData
+  with PlanTestBase { self: Suite =>
+
+  protected def sparkContext = spark.sparkContext
+
+  // Shorthand for running a query using our SQLContext
+  protected lazy val sql = spark.sql _
+
+  /**
+   * A helper object for importing SQL implicits.
+   *
+   * Note that the alternative of importing `spark.implicits._` is not possible here.
+   * This is because we create the `SQLContext` immediately before the first test is run,
+   * but the implicits import is needed in the constructor.
+   */
+  protected object testImplicits extends SQLImplicits {
+    protected override def _sqlContext: SQLContext = self.spark.sqlContext
+  }
+
+  protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+    SparkSession.setActiveSession(spark)
+    super.withSQLConf(pairs: _*)(f)
+  }
 
   /**
    * Drops functions after calling `f`. A function is represented by (functionName, isTemporary).

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index dc96ec4..218bd18 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -56,15 +56,6 @@ class VersionsSuite extends SparkFunSuite with Logging {
   import HiveClientBuilder.buildClient
 
   /**
-   * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
-   * returns.
-   */
-  protected def withTempDir(f: File => Unit): Unit = {
-    val dir = Utils.createTempDir().getCanonicalFile
-    try f(dir) finally Utils.deleteRecursively(dir)
-  }
-
-  /**
    * Drops table `tableName` after calling `f`.
    */
   protected def withTable(tableNames: String*)(f: => Unit): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index ada494e..6a0f523 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -557,16 +557,4 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
       verifyOutput[W](output.toSeq, expectedOutput, useSet)
     }
   }
-
-  /**
-   * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
-   * returns.
-   * (originally from `SqlTestUtils`.)
-   * @todo Probably this method should be moved to a more general place
-   */
-  protected def withTempDir(f: File => Unit): Unit = {
-    val dir = Utils.createTempDir().getCanonicalFile
-    try f(dir) finally Utils.deleteRecursively(dir)
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-26180][CORE][TEST] Reuse withTempDir function to the SparkCore test case

Posted by gu...@apache.org.
[SPARK-26180][CORE][TEST] Reuse withTempDir function to the SparkCore test case

## What changes were proposed in this pull request?

Currently, the common `withTempDir` function is used in Spark SQL test cases. To handle `val dir = Utils. createTempDir()` and `Utils. deleteRecursively (dir)`. Unfortunately, the `withTempDir` function cannot be used in the Spark Core test case. This PR Sharing `withTempDir` function in Spark Sql and SparkCore  to clean up SparkCore test cases. thanks.

## How was this patch tested?

N / A

Closes #23151 from heary-cao/withCreateTempDir.

Authored-by: caoxuewen <ca...@zte.com.cn>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/327ac83f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/327ac83f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/327ac83f

Branch: refs/heads/master
Commit: 327ac83f5cf33c84775a95442862bea56d8a0005
Parents: 2f6e88f
Author: caoxuewen <ca...@zte.com.cn>
Authored: Sat Dec 1 16:34:11 2018 +0800
Committer: Hyukjin Kwon <gu...@apache.org>
Committed: Sat Dec 1 16:34:11 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/CheckpointSuite.scala      |   5 +-
 .../org/apache/spark/ContextCleanerSuite.scala  |  97 ++--
 .../test/scala/org/apache/spark/FileSuite.scala |  19 +-
 .../org/apache/spark/SparkContextSuite.scala    | 315 +++++------
 .../scala/org/apache/spark/SparkFunSuite.scala  |  12 +-
 .../spark/api/python/PythonBroadcastSuite.scala |   5 +-
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 539 ++++++++++---------
 .../deploy/history/FsHistoryProviderSuite.scala |  89 +--
 .../history/HistoryServerArgumentsSuite.scala   |   8 +-
 .../deploy/master/PersistenceEngineSuite.scala  |   5 +-
 .../input/WholeTextFileInputFormatSuite.scala   |   6 +-
 .../input/WholeTextFileRecordReaderSuite.scala  |  62 ++-
 .../spark/rdd/PairRDDFunctionsSuite.scala       |   5 +-
 .../org/apache/spark/rpc/RpcEnvSuite.scala      | 113 ++--
 .../spark/scheduler/DAGSchedulerSuite.scala     |  22 +-
 ...utputCommitCoordinatorIntegrationSuite.scala |   5 +-
 .../spark/serializer/KryoSerializerSuite.scala  |  42 +-
 .../apache/spark/storage/DiskStoreSuite.scala   |   1 -
 .../util/PeriodicRDDCheckpointerSuite.scala     |  52 +-
 .../org/apache/spark/util/UtilsSuite.scala      | 222 ++++----
 .../apache/spark/sql/test/SQLTestUtils.scala    | 100 ++--
 .../spark/sql/hive/client/VersionsSuite.scala   |   9 -
 .../apache/spark/streaming/TestSuiteBase.scala  |  12 -
 23 files changed, 858 insertions(+), 887 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 48408cc..6d9e47c 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -586,8 +586,7 @@ object CheckpointSuite {
 class CheckpointCompressionSuite extends SparkFunSuite with LocalSparkContext {
 
   test("checkpoint compression") {
-    val checkpointDir = Utils.createTempDir()
-    try {
+    withTempDir { checkpointDir =>
       val conf = new SparkConf()
         .set("spark.checkpoint.compress", "true")
         .set("spark.ui.enabled", "false")
@@ -616,8 +615,6 @@ class CheckpointCompressionSuite extends SparkFunSuite with LocalSparkContext {
 
       // Verify that the compressed content can be read back
       assert(rdd.collect().toSeq === (1 to 20))
-    } finally {
-      Utils.deleteRecursively(checkpointDir)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 6724af9..1fcc975 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -207,54 +207,55 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
   }
 
   test("automatically cleanup normal checkpoint") {
-    val checkpointDir = Utils.createTempDir()
-    checkpointDir.delete()
-    var rdd = newPairRDD()
-    sc.setCheckpointDir(checkpointDir.toString)
-    rdd.checkpoint()
-    rdd.cache()
-    rdd.collect()
-    var rddId = rdd.id
-
-    // Confirm the checkpoint directory exists
-    assert(ReliableRDDCheckpointData.checkpointPath(sc, rddId).isDefined)
-    val path = ReliableRDDCheckpointData.checkpointPath(sc, rddId).get
-    val fs = path.getFileSystem(sc.hadoopConfiguration)
-    assert(fs.exists(path))
-
-    // the checkpoint is not cleaned by default (without the configuration set)
-    var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
-    rdd = null // Make RDD out of scope, ok if collected earlier
-    runGC()
-    postGCTester.assertCleanup()
-    assert(!fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
-
-    // Verify that checkpoints are NOT cleaned up if the config is not enabled
-    sc.stop()
-    val conf = new SparkConf()
-      .setMaster("local[2]")
-      .setAppName("cleanupCheckpoint")
-      .set("spark.cleaner.referenceTracking.cleanCheckpoints", "false")
-    sc = new SparkContext(conf)
-    rdd = newPairRDD()
-    sc.setCheckpointDir(checkpointDir.toString)
-    rdd.checkpoint()
-    rdd.cache()
-    rdd.collect()
-    rddId = rdd.id
-
-    // Confirm the checkpoint directory exists
-    assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
-
-    // Reference rdd to defeat any early collection by the JVM
-    rdd.count()
-
-    // Test that GC causes checkpoint data cleanup after dereferencing the RDD
-    postGCTester = new CleanerTester(sc, Seq(rddId))
-    rdd = null // Make RDD out of scope
-    runGC()
-    postGCTester.assertCleanup()
-    assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
+    withTempDir { checkpointDir =>
+      checkpointDir.delete()
+      var rdd = newPairRDD()
+      sc.setCheckpointDir(checkpointDir.toString)
+      rdd.checkpoint()
+      rdd.cache()
+      rdd.collect()
+      var rddId = rdd.id
+
+      // Confirm the checkpoint directory exists
+      assert(ReliableRDDCheckpointData.checkpointPath(sc, rddId).isDefined)
+      val path = ReliableRDDCheckpointData.checkpointPath(sc, rddId).get
+      val fs = path.getFileSystem(sc.hadoopConfiguration)
+      assert(fs.exists(path))
+
+      // the checkpoint is not cleaned by default (without the configuration set)
+      var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
+      rdd = null // Make RDD out of scope, ok if collected earlier
+      runGC()
+      postGCTester.assertCleanup()
+      assert(!fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
+
+      // Verify that checkpoints are NOT cleaned up if the config is not enabled
+      sc.stop()
+      val conf = new SparkConf()
+        .setMaster("local[2]")
+        .setAppName("cleanupCheckpoint")
+        .set("spark.cleaner.referenceTracking.cleanCheckpoints", "false")
+      sc = new SparkContext(conf)
+      rdd = newPairRDD()
+      sc.setCheckpointDir(checkpointDir.toString)
+      rdd.checkpoint()
+      rdd.cache()
+      rdd.collect()
+      rddId = rdd.id
+
+      // Confirm the checkpoint directory exists
+      assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
+
+      // Reference rdd to defeat any early collection by the JVM
+      rdd.count()
+
+      // Test that GC causes checkpoint data cleanup after dereferencing the RDD
+      postGCTester = new CleanerTester(sc, Seq(rddId))
+      rdd = null // Make RDD out of scope
+      runGC()
+      postGCTester.assertCleanup()
+      assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
+    }
   }
 
   test("automatically clean up local checkpoint") {

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index df04a5e..983a791 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -306,17 +306,18 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
       .set("spark.files.openCostInBytes", "0")
       .set("spark.default.parallelism", "1"))
 
-    val tempDir = Utils.createTempDir()
-    val tempDirPath = tempDir.getAbsolutePath
+    withTempDir { tempDir =>
+      val tempDirPath = tempDir.getAbsolutePath
 
-    for (i <- 0 until 8) {
-      val tempFile = new File(tempDir, s"part-0000$i")
-      Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", tempFile,
-        StandardCharsets.UTF_8)
-    }
+      for (i <- 0 until 8) {
+        val tempFile = new File(tempDir, s"part-0000$i")
+        Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", tempFile,
+          StandardCharsets.UTF_8)
+      }
 
-    for (p <- Seq(1, 2, 8)) {
-      assert(sc.binaryFiles(tempDirPath, minPartitions = p).getNumPartitions === p)
+      for (p <- Seq(1, 2, 8)) {
+        assert(sc.binaryFiles(tempDirPath, minPartitions = p).getNumPartitions === p)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 79192f3..ec4c7ef 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -116,56 +116,57 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
   }
 
   test("basic case for addFile and listFiles") {
-    val dir = Utils.createTempDir()
-
-    val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
-    val absolutePath1 = file1.getAbsolutePath
-
-    val file2 = File.createTempFile("someprefix2", "somesuffix2", dir)
-    val relativePath = file2.getParent + "/../" + file2.getParentFile.getName + "/" + file2.getName
-    val absolutePath2 = file2.getAbsolutePath
-
-    try {
-      Files.write("somewords1", file1, StandardCharsets.UTF_8)
-      Files.write("somewords2", file2, StandardCharsets.UTF_8)
-      val length1 = file1.length()
-      val length2 = file2.length()
-
-      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
-      sc.addFile(file1.getAbsolutePath)
-      sc.addFile(relativePath)
-      sc.parallelize(Array(1), 1).map(x => {
-        val gotten1 = new File(SparkFiles.get(file1.getName))
-        val gotten2 = new File(SparkFiles.get(file2.getName))
-        if (!gotten1.exists()) {
-          throw new SparkException("file doesn't exist : " + absolutePath1)
-        }
-        if (!gotten2.exists()) {
-          throw new SparkException("file doesn't exist : " + absolutePath2)
-        }
+    withTempDir { dir =>
+      val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
+      val absolutePath1 = file1.getAbsolutePath
+
+      val file2 = File.createTempFile("someprefix2", "somesuffix2", dir)
+      val relativePath = file2.getParent + "/../" + file2.getParentFile.getName +
+        "/" + file2.getName
+      val absolutePath2 = file2.getAbsolutePath
+
+      try {
+        Files.write("somewords1", file1, StandardCharsets.UTF_8)
+        Files.write("somewords2", file2, StandardCharsets.UTF_8)
+        val length1 = file1.length()
+        val length2 = file2.length()
+
+        sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+        sc.addFile(file1.getAbsolutePath)
+        sc.addFile(relativePath)
+        sc.parallelize(Array(1), 1).map(x => {
+          val gotten1 = new File(SparkFiles.get(file1.getName))
+          val gotten2 = new File(SparkFiles.get(file2.getName))
+          if (!gotten1.exists()) {
+            throw new SparkException("file doesn't exist : " + absolutePath1)
+          }
+          if (!gotten2.exists()) {
+            throw new SparkException("file doesn't exist : " + absolutePath2)
+          }
 
-        if (length1 != gotten1.length()) {
-          throw new SparkException(
-            s"file has different length $length1 than added file ${gotten1.length()} : " +
-              absolutePath1)
-        }
-        if (length2 != gotten2.length()) {
-          throw new SparkException(
-            s"file has different length $length2 than added file ${gotten2.length()} : " +
-              absolutePath2)
-        }
+          if (length1 != gotten1.length()) {
+            throw new SparkException(
+              s"file has different length $length1 than added file ${gotten1.length()} : " +
+                absolutePath1)
+          }
+          if (length2 != gotten2.length()) {
+            throw new SparkException(
+              s"file has different length $length2 than added file ${gotten2.length()} : " +
+                absolutePath2)
+          }
 
-        if (absolutePath1 == gotten1.getAbsolutePath) {
-          throw new SparkException("file should have been copied :" + absolutePath1)
-        }
-        if (absolutePath2 == gotten2.getAbsolutePath) {
-          throw new SparkException("file should have been copied : " + absolutePath2)
-        }
-        x
-      }).count()
-      assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1)
-    } finally {
-      sc.stop()
+          if (absolutePath1 == gotten1.getAbsolutePath) {
+            throw new SparkException("file should have been copied :" + absolutePath1)
+          }
+          if (absolutePath2 == gotten2.getAbsolutePath) {
+            throw new SparkException("file should have been copied : " + absolutePath2)
+          }
+          x
+        }).count()
+        assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1)
+      } finally {
+        sc.stop()
+      }
     }
   }
 
@@ -202,51 +203,51 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
   }
 
   test("addFile recursive works") {
-    val pluto = Utils.createTempDir()
-    val neptune = Utils.createTempDir(pluto.getAbsolutePath)
-    val saturn = Utils.createTempDir(neptune.getAbsolutePath)
-    val alien1 = File.createTempFile("alien", "1", neptune)
-    val alien2 = File.createTempFile("alien", "2", saturn)
-
-    try {
-      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
-      sc.addFile(neptune.getAbsolutePath, true)
-      sc.parallelize(Array(1), 1).map(x => {
-        val sep = File.separator
-        if (!new File(SparkFiles.get(neptune.getName + sep + alien1.getName)).exists()) {
-          throw new SparkException("can't access file under root added directory")
-        }
-        if (!new File(SparkFiles.get(neptune.getName + sep + saturn.getName + sep + alien2.getName))
-            .exists()) {
-          throw new SparkException("can't access file in nested directory")
-        }
-        if (new File(SparkFiles.get(pluto.getName + sep + neptune.getName + sep + alien1.getName))
-            .exists()) {
-          throw new SparkException("file exists that shouldn't")
-        }
-        x
-      }).count()
-    } finally {
-      sc.stop()
+    withTempDir { pluto =>
+      val neptune = Utils.createTempDir(pluto.getAbsolutePath)
+      val saturn = Utils.createTempDir(neptune.getAbsolutePath)
+      val alien1 = File.createTempFile("alien", "1", neptune)
+      val alien2 = File.createTempFile("alien", "2", saturn)
+
+      try {
+        sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+        sc.addFile(neptune.getAbsolutePath, true)
+        sc.parallelize(Array(1), 1).map(x => {
+          val sep = File.separator
+          if (!new File(SparkFiles.get(neptune.getName + sep + alien1.getName)).exists()) {
+            throw new SparkException("can't access file under root added directory")
+          }
+          if (!new File(SparkFiles.get(
+            neptune.getName + sep + saturn.getName + sep + alien2.getName)).exists()) {
+            throw new SparkException("can't access file in nested directory")
+          }
+          if (new File(SparkFiles.get(
+            pluto.getName + sep + neptune.getName + sep + alien1.getName)).exists()) {
+            throw new SparkException("file exists that shouldn't")
+          }
+          x
+        }).count()
+      } finally {
+        sc.stop()
+      }
     }
   }
 
   test("addFile recursive can't add directories by default") {
-    val dir = Utils.createTempDir()
-
-    try {
-      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
-      intercept[SparkException] {
-        sc.addFile(dir.getAbsolutePath)
+    withTempDir { dir =>
+      try {
+        sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+        intercept[SparkException] {
+          sc.addFile(dir.getAbsolutePath)
+        }
+      } finally {
+        sc.stop()
       }
-    } finally {
-      sc.stop()
     }
   }
 
   test("cannot call addFile with different paths that have the same filename") {
-    val dir = Utils.createTempDir()
-    try {
+    withTempDir { dir =>
       val subdir1 = new File(dir, "subdir1")
       val subdir2 = new File(dir, "subdir2")
       assert(subdir1.mkdir())
@@ -267,8 +268,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
         sc.addFile(file2.getAbsolutePath)
       }
       assert(getAddedFileContents() === "old")
-    } finally {
-      Utils.deleteRecursively(dir)
     }
   }
 
@@ -296,30 +295,33 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
   }
 
   test("add jar with invalid path") {
-    val tmpDir = Utils.createTempDir()
-    val tmpJar = File.createTempFile("test", ".jar", tmpDir)
+    withTempDir { tmpDir =>
+      val tmpJar = File.createTempFile("test", ".jar", tmpDir)
 
-    sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
-    sc.addJar(tmpJar.getAbsolutePath)
+      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+      sc.addJar(tmpJar.getAbsolutePath)
 
-    // Invalid jar path will only print the error log, will not add to file server.
-    sc.addJar("dummy.jar")
-    sc.addJar("")
-    sc.addJar(tmpDir.getAbsolutePath)
+      // Invalid jar path will only print the error log, will not add to file server.
+      sc.addJar("dummy.jar")
+      sc.addJar("")
+      sc.addJar(tmpDir.getAbsolutePath)
 
-    assert(sc.listJars().size == 1)
-    assert(sc.listJars().head.contains(tmpJar.getName))
+      assert(sc.listJars().size == 1)
+      assert(sc.listJars().head.contains(tmpJar.getName))
+    }
   }
 
   test("SPARK-22585 addJar argument without scheme is interpreted literally without url decoding") {
-    val tmpDir = new File(Utils.createTempDir(), "host%3A443")
-    tmpDir.mkdirs()
-    val tmpJar = File.createTempFile("t%2F", ".jar", tmpDir)
+    withTempDir { dir =>
+      val tmpDir = new File(dir, "host%3A443")
+      tmpDir.mkdirs()
+      val tmpJar = File.createTempFile("t%2F", ".jar", tmpDir)
 
-    sc = new SparkContext("local", "test")
+      sc = new SparkContext("local", "test")
 
-    sc.addJar(tmpJar.getAbsolutePath)
-    assert(sc.listJars().size === 1)
+      sc.addJar(tmpJar.getAbsolutePath)
+      assert(sc.listJars().size === 1)
+    }
   }
 
   test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
@@ -340,60 +342,61 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
   test("Comma separated paths for newAPIHadoopFile/wholeTextFiles/binaryFiles (SPARK-7155)") {
     // Regression test for SPARK-7155
     // dir1 and dir2 are used for wholeTextFiles and binaryFiles
-    val dir1 = Utils.createTempDir()
-    val dir2 = Utils.createTempDir()
-
-    val dirpath1 = dir1.getAbsolutePath
-    val dirpath2 = dir2.getAbsolutePath
-
-    // file1 and file2 are placed inside dir1, they are also used for
-    // textFile, hadoopFile, and newAPIHadoopFile
-    // file3, file4 and file5 are placed inside dir2, they are used for
-    // textFile, hadoopFile, and newAPIHadoopFile as well
-    val file1 = new File(dir1, "part-00000")
-    val file2 = new File(dir1, "part-00001")
-    val file3 = new File(dir2, "part-00000")
-    val file4 = new File(dir2, "part-00001")
-    val file5 = new File(dir2, "part-00002")
-
-    val filepath1 = file1.getAbsolutePath
-    val filepath2 = file2.getAbsolutePath
-    val filepath3 = file3.getAbsolutePath
-    val filepath4 = file4.getAbsolutePath
-    val filepath5 = file5.getAbsolutePath
-
-
-    try {
-      // Create 5 text files.
-      Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1,
-        StandardCharsets.UTF_8)
-      Files.write("someline1 in file2\nsomeline2 in file2", file2, StandardCharsets.UTF_8)
-      Files.write("someline1 in file3", file3, StandardCharsets.UTF_8)
-      Files.write("someline1 in file4\nsomeline2 in file4", file4, StandardCharsets.UTF_8)
-      Files.write("someline1 in file2\nsomeline2 in file5", file5, StandardCharsets.UTF_8)
-
-      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
-
-      // Test textFile, hadoopFile, and newAPIHadoopFile for file1 and file2
-      assert(sc.textFile(filepath1 + "," + filepath2).count() == 5L)
-      assert(sc.hadoopFile(filepath1 + "," + filepath2,
-        classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
-      assert(sc.newAPIHadoopFile(filepath1 + "," + filepath2,
-        classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
-
-      // Test textFile, hadoopFile, and newAPIHadoopFile for file3, file4, and file5
-      assert(sc.textFile(filepath3 + "," + filepath4 + "," + filepath5).count() == 5L)
-      assert(sc.hadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
-               classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
-      assert(sc.newAPIHadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
-               classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
-
-      // Test wholeTextFiles, and binaryFiles for dir1 and dir2
-      assert(sc.wholeTextFiles(dirpath1 + "," + dirpath2).count() == 5L)
-      assert(sc.binaryFiles(dirpath1 + "," + dirpath2).count() == 5L)
-
-    } finally {
-      sc.stop()
+    withTempDir { dir1 =>
+      withTempDir { dir2 =>
+        val dirpath1 = dir1.getAbsolutePath
+        val dirpath2 = dir2.getAbsolutePath
+
+        // file1 and file2 are placed inside dir1, they are also used for
+        // textFile, hadoopFile, and newAPIHadoopFile
+        // file3, file4 and file5 are placed inside dir2, they are used for
+        // textFile, hadoopFile, and newAPIHadoopFile as well
+        val file1 = new File(dir1, "part-00000")
+        val file2 = new File(dir1, "part-00001")
+        val file3 = new File(dir2, "part-00000")
+        val file4 = new File(dir2, "part-00001")
+        val file5 = new File(dir2, "part-00002")
+
+        val filepath1 = file1.getAbsolutePath
+        val filepath2 = file2.getAbsolutePath
+        val filepath3 = file3.getAbsolutePath
+        val filepath4 = file4.getAbsolutePath
+        val filepath5 = file5.getAbsolutePath
+
+
+        try {
+          // Create 5 text files.
+          Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1,
+            StandardCharsets.UTF_8)
+          Files.write("someline1 in file2\nsomeline2 in file2", file2, StandardCharsets.UTF_8)
+          Files.write("someline1 in file3", file3, StandardCharsets.UTF_8)
+          Files.write("someline1 in file4\nsomeline2 in file4", file4, StandardCharsets.UTF_8)
+          Files.write("someline1 in file2\nsomeline2 in file5", file5, StandardCharsets.UTF_8)
+
+          sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+
+          // Test textFile, hadoopFile, and newAPIHadoopFile for file1 and file2
+          assert(sc.textFile(filepath1 + "," + filepath2).count() == 5L)
+          assert(sc.hadoopFile(filepath1 + "," + filepath2,
+            classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
+          assert(sc.newAPIHadoopFile(filepath1 + "," + filepath2,
+            classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
+
+          // Test textFile, hadoopFile, and newAPIHadoopFile for file3, file4, and file5
+          assert(sc.textFile(filepath3 + "," + filepath4 + "," + filepath5).count() == 5L)
+          assert(sc.hadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
+            classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
+          assert(sc.newAPIHadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
+            classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
+
+          // Test wholeTextFiles, and binaryFiles for dir1 and dir2
+          assert(sc.wholeTextFiles(dirpath1 + "," + dirpath2).count() == 5L)
+          assert(sc.binaryFiles(dirpath1 + "," + dirpath2).count() == 5L)
+
+        } finally {
+          sc.stop()
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index 3128902..dad24d7 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -23,7 +23,7 @@ import java.io.File
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.AccumulatorContext
+import org.apache.spark.util.{AccumulatorContext, Utils}
 
 /**
  * Base abstract class for all unit tests in Spark for handling common functionality.
@@ -106,4 +106,14 @@ abstract class SparkFunSuite
     }
   }
 
+  /**
+   * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
+   * returns.
+   */
+  protected def withTempDir(f: File => Unit): Unit = {
+    val dir = Utils.createTempDir()
+    try f(dir) finally {
+      Utils.deleteRecursively(dir)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
index b38a366..7407a65 100644
--- a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
@@ -31,7 +31,6 @@ import org.apache.spark.util.Utils
 // a PythonBroadcast:
 class PythonBroadcastSuite extends SparkFunSuite with Matchers with SharedSparkContext {
   test("PythonBroadcast can be serialized with Kryo (SPARK-4882)") {
-    val tempDir = Utils.createTempDir()
     val broadcastedString = "Hello, world!"
     def assertBroadcastIsValid(broadcast: PythonBroadcast): Unit = {
       val source = Source.fromFile(broadcast.path)
@@ -39,7 +38,7 @@ class PythonBroadcastSuite extends SparkFunSuite with Matchers with SharedSparkC
       source.close()
       contents should be (broadcastedString)
     }
-    try {
+    withTempDir { tempDir =>
       val broadcastDataFile: File = {
         val file = new File(tempDir, "broadcastData")
         val printWriter = new PrintWriter(file)
@@ -53,8 +52,6 @@ class PythonBroadcastSuite extends SparkFunSuite with Matchers with SharedSparkC
       val deserializedBroadcast =
         Utils.clone[PythonBroadcast](broadcast, new KryoSerializer(conf).newInstance())
       assertBroadcastIsValid(deserializedBroadcast)
-    } finally {
-      Utils.deleteRecursively(tempDir)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index c093789..a8973d1 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -494,13 +494,11 @@ class SparkSubmitSuite
   }
 
   test("launch simple application with spark-submit with redaction") {
-    val testDir = Utils.createTempDir()
-    testDir.deleteOnExit()
-    val testDirPath = new Path(testDir.getAbsolutePath())
     val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
     val fileSystem = Utils.getHadoopFileSystem("/",
       SparkHadoopUtil.get.newConfiguration(new SparkConf()))
-    try {
+    withTempDir { testDir =>
+      val testDirPath = new Path(testDir.getAbsolutePath())
       val args = Seq(
         "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
         "--name", "testApp",
@@ -519,8 +517,6 @@ class SparkSubmitSuite
       Source.fromInputStream(logData).getLines().foreach { line =>
         assert(!line.contains("secret_password"))
       }
-    } finally {
-      Utils.deleteRecursively(testDir)
     }
   }
 
@@ -614,108 +610,112 @@ class SparkSubmitSuite
     assert(new File(rScriptDir).exists)
 
     // compile a small jar containing a class that will be called from R code.
-    val tempDir = Utils.createTempDir()
-    val srcDir = new File(tempDir, "sparkrtest")
-    srcDir.mkdirs()
-    val excSource = new JavaSourceFromString(new File(srcDir, "DummyClass").toURI.getPath,
-      """package sparkrtest;
+    withTempDir { tempDir =>
+      val srcDir = new File(tempDir, "sparkrtest")
+      srcDir.mkdirs()
+      val excSource = new JavaSourceFromString(new File(srcDir, "DummyClass").toURI.getPath,
+        """package sparkrtest;
         |
         |public class DummyClass implements java.io.Serializable {
         |  public static String helloWorld(String arg) { return "Hello " + arg; }
         |  public static int addStuff(int arg1, int arg2) { return arg1 + arg2; }
         |}
-      """.stripMargin)
-    val excFile = TestUtils.createCompiledClass("DummyClass", srcDir, excSource, Seq.empty)
-    val jarFile = new File(tempDir, "sparkRTestJar-%s.jar".format(System.currentTimeMillis()))
-    val jarURL = TestUtils.createJar(Seq(excFile), jarFile, directoryPrefix = Some("sparkrtest"))
+      """.
+          stripMargin)
+      val excFile = TestUtils.createCompiledClass("DummyClass", srcDir, excSource, Seq.empty)
+      val jarFile = new File(tempDir, "sparkRTestJar-%s.jar".format(System.currentTimeMillis()))
+      val jarURL = TestUtils.createJar(Seq(excFile), jarFile, directoryPrefix = Some("sparkrtest"))
 
-    val args = Seq(
-      "--name", "testApp",
-      "--master", "local",
-      "--jars", jarURL.toString,
-      "--verbose",
-      "--conf", "spark.ui.enabled=false",
-      rScriptDir)
-    runSparkSubmit(args)
+      val args = Seq(
+        "--name", "testApp",
+        "--master", "local",
+        "--jars", jarURL.toString,
+        "--verbose",
+        "--conf", "spark.ui.enabled=false",
+        rScriptDir)
+      runSparkSubmit(args)
+    }
   }
 
   test("resolves command line argument paths correctly") {
-    val dir = Utils.createTempDir()
-    val archive = Paths.get(dir.toPath.toString, "single.zip")
-    Files.createFile(archive)
-    val jars = "/jar1,/jar2"
-    val files = "local:/file1,file2"
-    val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
-    val pyFiles = "py-file1,py-file2"
-
-    // Test jars and files
-    val clArgs = Seq(
-      "--master", "local",
-      "--class", "org.SomeClass",
-      "--jars", jars,
-      "--files", files,
-      "thejar.jar")
-    val appArgs = new SparkSubmitArguments(clArgs)
-    val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
-    appArgs.jars should be (Utils.resolveURIs(jars))
-    appArgs.files should be (Utils.resolveURIs(files))
-    conf.get("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar"))
-    conf.get("spark.files") should be (Utils.resolveURIs(files))
-
-    // Test files and archives (Yarn)
-    val clArgs2 = Seq(
-      "--master", "yarn",
-      "--class", "org.SomeClass",
-      "--files", files,
-      "--archives", archives,
-      "thejar.jar"
-    )
-    val appArgs2 = new SparkSubmitArguments(clArgs2)
-    val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
-    appArgs2.files should be (Utils.resolveURIs(files))
-    appArgs2.archives should fullyMatch regex ("file:/archive1,file:.*#archive3")
-    conf2.get("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
-    conf2.get("spark.yarn.dist.archives") should fullyMatch regex
-      ("file:/archive1,file:.*#archive3")
-
-    // Test python files
-    val clArgs3 = Seq(
-      "--master", "local",
-      "--py-files", pyFiles,
-      "--conf", "spark.pyspark.driver.python=python3.4",
-      "--conf", "spark.pyspark.python=python3.5",
-      "mister.py"
-    )
-    val appArgs3 = new SparkSubmitArguments(clArgs3)
-    val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
-    appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
-    conf3.get("spark.submit.pyFiles") should be (
-      PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
-    conf3.get(PYSPARK_DRIVER_PYTHON.key) should be ("python3.4")
-    conf3.get(PYSPARK_PYTHON.key) should be ("python3.5")
+    withTempDir { dir =>
+      val archive = Paths.get(dir.toPath.toString, "single.zip")
+      Files.createFile(archive)
+      val jars = "/jar1,/jar2"
+      val files = "local:/file1,file2"
+      val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
+      val pyFiles = "py-file1,py-file2"
+
+      // Test jars and files
+      val clArgs = Seq(
+        "--master", "local",
+        "--class", "org.SomeClass",
+        "--jars", jars,
+        "--files", files,
+        "thejar.jar")
+      val appArgs = new SparkSubmitArguments(clArgs)
+      val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
+      appArgs.jars should be(Utils.resolveURIs(jars))
+      appArgs.files should be(Utils.resolveURIs(files))
+      conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
+      conf.get("spark.files") should be(Utils.resolveURIs(files))
+
+      // Test files and archives (Yarn)
+      val clArgs2 = Seq(
+        "--master", "yarn",
+        "--class", "org.SomeClass",
+        "--files", files,
+        "--archives", archives,
+        "thejar.jar"
+      )
+      val appArgs2 = new SparkSubmitArguments(clArgs2)
+      val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
+      appArgs2.files should be(Utils.resolveURIs(files))
+      appArgs2.archives should fullyMatch regex ("file:/archive1,file:.*#archive3")
+      conf2.get("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
+      conf2.get("spark.yarn.dist.archives") should fullyMatch regex
+        ("file:/archive1,file:.*#archive3")
+
+      // Test python files
+      val clArgs3 = Seq(
+        "--master", "local",
+        "--py-files", pyFiles,
+        "--conf", "spark.pyspark.driver.python=python3.4",
+        "--conf", "spark.pyspark.python=python3.5",
+        "mister.py"
+      )
+      val appArgs3 = new SparkSubmitArguments(clArgs3)
+      val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
+      appArgs3.pyFiles should be(Utils.resolveURIs(pyFiles))
+      conf3.get("spark.submit.pyFiles") should be(
+        PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+      conf3.get(PYSPARK_DRIVER_PYTHON.key) should be("python3.4")
+      conf3.get(PYSPARK_PYTHON.key) should be("python3.5")
+    }
   }
 
   test("ambiguous archive mapping results in error message") {
-    val dir = Utils.createTempDir()
-    val archive1 = Paths.get(dir.toPath.toString, "first.zip")
-    val archive2 = Paths.get(dir.toPath.toString, "second.zip")
-    Files.createFile(archive1)
-    Files.createFile(archive2)
-    val jars = "/jar1,/jar2"
-    val files = "local:/file1,file2"
-    val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
-    val pyFiles = "py-file1,py-file2"
-
-    // Test files and archives (Yarn)
-    val clArgs2 = Seq(
-      "--master", "yarn",
-      "--class", "org.SomeClass",
-      "--files", files,
-      "--archives", archives,
-      "thejar.jar"
-    )
+    withTempDir { dir =>
+      val archive1 = Paths.get(dir.toPath.toString, "first.zip")
+      val archive2 = Paths.get(dir.toPath.toString, "second.zip")
+      Files.createFile(archive1)
+      Files.createFile(archive2)
+      val jars = "/jar1,/jar2"
+      val files = "local:/file1,file2"
+      val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
+      val pyFiles = "py-file1,py-file2"
+
+      // Test files and archives (Yarn)
+      val clArgs2 = Seq(
+        "--master", "yarn",
+        "--class", "org.SomeClass",
+        "--files", files,
+        "--archives", archives,
+        "thejar.jar"
+      )
 
-    testPrematureExit(clArgs2.toArray, "resolves ambiguously to multiple files")
+      testPrematureExit(clArgs2.toArray, "resolves ambiguously to multiple files")
+    }
   }
 
   test("resolves config paths correctly") {
@@ -724,77 +724,77 @@ class SparkSubmitSuite
     val archives = "file:/archive1,archive2" // spark.yarn.dist.archives
     val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles
 
-    val tmpDir = Utils.createTempDir()
-
-    // Test jars and files
-    val f1 = File.createTempFile("test-submit-jars-files", "", tmpDir)
-    val writer1 = new PrintWriter(f1)
-    writer1.println("spark.jars " + jars)
-    writer1.println("spark.files " + files)
-    writer1.close()
-    val clArgs = Seq(
-      "--master", "local",
-      "--class", "org.SomeClass",
-      "--properties-file", f1.getPath,
-      "thejar.jar"
-    )
-    val appArgs = new SparkSubmitArguments(clArgs)
-    val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
-    conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
-    conf.get("spark.files") should be(Utils.resolveURIs(files))
-
-    // Test files and archives (Yarn)
-    val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
-    val writer2 = new PrintWriter(f2)
-    writer2.println("spark.yarn.dist.files " + files)
-    writer2.println("spark.yarn.dist.archives " + archives)
-    writer2.close()
-    val clArgs2 = Seq(
-      "--master", "yarn",
-      "--class", "org.SomeClass",
-      "--properties-file", f2.getPath,
-      "thejar.jar"
-    )
-    val appArgs2 = new SparkSubmitArguments(clArgs2)
-    val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
-    conf2.get("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
-    conf2.get("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))
-
-    // Test python files
-    val f3 = File.createTempFile("test-submit-python-files", "", tmpDir)
-    val writer3 = new PrintWriter(f3)
-    writer3.println("spark.submit.pyFiles " + pyFiles)
-    writer3.close()
-    val clArgs3 = Seq(
-      "--master", "local",
-      "--properties-file", f3.getPath,
-      "mister.py"
-    )
-    val appArgs3 = new SparkSubmitArguments(clArgs3)
-    val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
-    conf3.get("spark.submit.pyFiles") should be(
-      PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
-
-    // Test remote python files
-    val hadoopConf = new Configuration()
-    updateConfWithFakeS3Fs(hadoopConf)
-    val f4 = File.createTempFile("test-submit-remote-python-files", "", tmpDir)
-    val pyFile1 = File.createTempFile("file1", ".py", tmpDir)
-    val pyFile2 = File.createTempFile("file2", ".py", tmpDir)
-    val writer4 = new PrintWriter(f4)
-    val remotePyFiles = s"s3a://${pyFile1.getAbsolutePath},s3a://${pyFile2.getAbsolutePath}"
-    writer4.println("spark.submit.pyFiles " + remotePyFiles)
-    writer4.close()
-    val clArgs4 = Seq(
-      "--master", "yarn",
-      "--deploy-mode", "cluster",
-      "--properties-file", f4.getPath,
-      "hdfs:///tmp/mister.py"
-    )
-    val appArgs4 = new SparkSubmitArguments(clArgs4)
-    val (_, _, conf4, _) = submit.prepareSubmitEnvironment(appArgs4, conf = Some(hadoopConf))
-    // Should not format python path for yarn cluster mode
-    conf4.get("spark.submit.pyFiles") should be(Utils.resolveURIs(remotePyFiles))
+    withTempDir { tmpDir =>
+      // Test jars and files
+      val f1 = File.createTempFile("test-submit-jars-files", "", tmpDir)
+      val writer1 = new PrintWriter(f1)
+      writer1.println("spark.jars " + jars)
+      writer1.println("spark.files " + files)
+      writer1.close()
+      val clArgs = Seq(
+        "--master", "local",
+        "--class", "org.SomeClass",
+        "--properties-file", f1.getPath,
+        "thejar.jar"
+      )
+      val appArgs = new SparkSubmitArguments(clArgs)
+      val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
+      conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
+      conf.get("spark.files") should be(Utils.resolveURIs(files))
+
+      // Test files and archives (Yarn)
+      val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
+      val writer2 = new PrintWriter(f2)
+      writer2.println("spark.yarn.dist.files " + files)
+      writer2.println("spark.yarn.dist.archives " + archives)
+      writer2.close()
+      val clArgs2 = Seq(
+        "--master", "yarn",
+        "--class", "org.SomeClass",
+        "--properties-file", f2.getPath,
+        "thejar.jar"
+      )
+      val appArgs2 = new SparkSubmitArguments(clArgs2)
+      val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
+      conf2.get("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
+      conf2.get("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))
+
+      // Test python files
+      val f3 = File.createTempFile("test-submit-python-files", "", tmpDir)
+      val writer3 = new PrintWriter(f3)
+      writer3.println("spark.submit.pyFiles " + pyFiles)
+      writer3.close()
+      val clArgs3 = Seq(
+        "--master", "local",
+        "--properties-file", f3.getPath,
+        "mister.py"
+      )
+      val appArgs3 = new SparkSubmitArguments(clArgs3)
+      val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
+      conf3.get("spark.submit.pyFiles") should be(
+        PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+
+      // Test remote python files
+      val hadoopConf = new Configuration()
+      updateConfWithFakeS3Fs(hadoopConf)
+      val f4 = File.createTempFile("test-submit-remote-python-files", "", tmpDir)
+      val pyFile1 = File.createTempFile("file1", ".py", tmpDir)
+      val pyFile2 = File.createTempFile("file2", ".py", tmpDir)
+      val writer4 = new PrintWriter(f4)
+      val remotePyFiles = s"s3a://${pyFile1.getAbsolutePath},s3a://${pyFile2.getAbsolutePath}"
+      writer4.println("spark.submit.pyFiles " + remotePyFiles)
+      writer4.close()
+      val clArgs4 = Seq(
+        "--master", "yarn",
+        "--deploy-mode", "cluster",
+        "--properties-file", f4.getPath,
+        "hdfs:///tmp/mister.py"
+      )
+      val appArgs4 = new SparkSubmitArguments(clArgs4)
+      val (_, _, conf4, _) = submit.prepareSubmitEnvironment(appArgs4, conf = Some(hadoopConf))
+      // Should not format python path for yarn cluster mode
+      conf4.get("spark.submit.pyFiles") should be(Utils.resolveURIs(remotePyFiles))
+    }
   }
 
   test("user classpath first in driver") {
@@ -828,46 +828,50 @@ class SparkSubmitSuite
   }
 
   test("support glob path") {
-    val tmpJarDir = Utils.createTempDir()
-    val jar1 = TestUtils.createJarWithFiles(Map("test.resource" -> "1"), tmpJarDir)
-    val jar2 = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpJarDir)
-
-    val tmpFileDir = Utils.createTempDir()
-    val file1 = File.createTempFile("tmpFile1", "", tmpFileDir)
-    val file2 = File.createTempFile("tmpFile2", "", tmpFileDir)
-
-    val tmpPyFileDir = Utils.createTempDir()
-    val pyFile1 = File.createTempFile("tmpPy1", ".py", tmpPyFileDir)
-    val pyFile2 = File.createTempFile("tmpPy2", ".egg", tmpPyFileDir)
-
-    val tmpArchiveDir = Utils.createTempDir()
-    val archive1 = File.createTempFile("archive1", ".zip", tmpArchiveDir)
-    val archive2 = File.createTempFile("archive2", ".zip", tmpArchiveDir)
-
-    val tempPyFile = File.createTempFile("tmpApp", ".py")
-    tempPyFile.deleteOnExit()
-
-    val args = Seq(
-      "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
-      "--name", "testApp",
-      "--master", "yarn",
-      "--deploy-mode", "client",
-      "--jars", s"${tmpJarDir.getAbsolutePath}/*.jar",
-      "--files", s"${tmpFileDir.getAbsolutePath}/tmpFile*",
-      "--py-files", s"${tmpPyFileDir.getAbsolutePath}/tmpPy*",
-      "--archives", s"${tmpArchiveDir.getAbsolutePath}/*.zip",
-      tempPyFile.toURI().toString())
-
-    val appArgs = new SparkSubmitArguments(args)
-    val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
-    conf.get("spark.yarn.dist.jars").split(",").toSet should be
-      (Set(jar1.toURI.toString, jar2.toURI.toString))
-    conf.get("spark.yarn.dist.files").split(",").toSet should be
-      (Set(file1.toURI.toString, file2.toURI.toString))
-    conf.get("spark.yarn.dist.pyFiles").split(",").toSet should be
-      (Set(pyFile1.getAbsolutePath, pyFile2.getAbsolutePath))
-    conf.get("spark.yarn.dist.archives").split(",").toSet should be
-      (Set(archive1.toURI.toString, archive2.toURI.toString))
+    withTempDir { tmpJarDir =>
+      withTempDir { tmpFileDir =>
+        withTempDir { tmpPyFileDir =>
+          withTempDir { tmpArchiveDir =>
+            val jar1 = TestUtils.createJarWithFiles(Map("test.resource" -> "1"), tmpJarDir)
+            val jar2 = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpJarDir)
+
+            val file1 = File.createTempFile("tmpFile1", "", tmpFileDir)
+            val file2 = File.createTempFile("tmpFile2", "", tmpFileDir)
+
+            val pyFile1 = File.createTempFile("tmpPy1", ".py", tmpPyFileDir)
+            val pyFile2 = File.createTempFile("tmpPy2", ".egg", tmpPyFileDir)
+
+            val archive1 = File.createTempFile("archive1", ".zip", tmpArchiveDir)
+            val archive2 = File.createTempFile("archive2", ".zip", tmpArchiveDir)
+
+            val tempPyFile = File.createTempFile("tmpApp", ".py")
+            tempPyFile.deleteOnExit()
+
+            val args = Seq(
+              "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
+              "--name", "testApp",
+              "--master", "yarn",
+              "--deploy-mode", "client",
+              "--jars", s"${tmpJarDir.getAbsolutePath}/*.jar",
+              "--files", s"${tmpFileDir.getAbsolutePath}/tmpFile*",
+              "--py-files", s"${tmpPyFileDir.getAbsolutePath}/tmpPy*",
+              "--archives", s"${tmpArchiveDir.getAbsolutePath}/*.zip",
+              tempPyFile.toURI().toString())
+
+            val appArgs = new SparkSubmitArguments(args)
+            val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
+            conf.get("spark.yarn.dist.jars").split(",").toSet should be
+            (Set(jar1.toURI.toString, jar2.toURI.toString))
+            conf.get("spark.yarn.dist.files").split(",").toSet should be
+            (Set(file1.toURI.toString, file2.toURI.toString))
+            conf.get("spark.yarn.dist.pyFiles").split(",").toSet should be
+            (Set(pyFile1.getAbsolutePath, pyFile2.getAbsolutePath))
+            conf.get("spark.yarn.dist.archives").split(",").toSet should be
+            (Set(archive1.toURI.toString, archive2.toURI.toString))
+          }
+        }
+      }
+    }
   }
 
   // scalastyle:on println
@@ -985,37 +989,38 @@ class SparkSubmitSuite
     val hadoopConf = new Configuration()
     updateConfWithFakeS3Fs(hadoopConf)
 
-    val tmpDir = Utils.createTempDir()
-    val file = File.createTempFile("tmpFile", "", tmpDir)
-    val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)
-    val mainResource = File.createTempFile("tmpPy", ".py", tmpDir)
-    val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
-    val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}"
+    withTempDir { tmpDir =>
+      val file = File.createTempFile("tmpFile", "", tmpDir)
+      val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)
+      val mainResource = File.createTempFile("tmpPy", ".py", tmpDir)
+      val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
+      val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}"
 
-    val args = Seq(
-      "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
-      "--name", "testApp",
-      "--master", "yarn",
-      "--deploy-mode", "client",
-      "--jars", tmpJarPath,
-      "--files", s"s3a://${file.getAbsolutePath}",
-      "--py-files", s"s3a://${pyFile.getAbsolutePath}",
-      s"s3a://$mainResource"
+      val args = Seq(
+        "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
+        "--name", "testApp",
+        "--master", "yarn",
+        "--deploy-mode", "client",
+        "--jars", tmpJarPath,
+        "--files", s"s3a://${file.getAbsolutePath}",
+        "--py-files", s"s3a://${pyFile.getAbsolutePath}",
+        s"s3a://$mainResource"
       )
 
-    val appArgs = new SparkSubmitArguments(args)
-    val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf))
+      val appArgs = new SparkSubmitArguments(args)
+      val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf))
 
-    // All the resources should still be remote paths, so that YARN client will not upload again.
-    conf.get("spark.yarn.dist.jars") should be (tmpJarPath)
-    conf.get("spark.yarn.dist.files") should be (s"s3a://${file.getAbsolutePath}")
-    conf.get("spark.yarn.dist.pyFiles") should be (s"s3a://${pyFile.getAbsolutePath}")
+      // All the resources should still be remote paths, so that YARN client will not upload again.
+      conf.get("spark.yarn.dist.jars") should be(tmpJarPath)
+      conf.get("spark.yarn.dist.files") should be(s"s3a://${file.getAbsolutePath}")
+      conf.get("spark.yarn.dist.pyFiles") should be(s"s3a://${pyFile.getAbsolutePath}")
 
-    // Local repl jars should be a local path.
-    conf.get("spark.repl.local.jars") should (startWith("file:"))
+      // Local repl jars should be a local path.
+      conf.get("spark.repl.local.jars") should (startWith("file:"))
 
-    // local py files should not be a URI format.
-    conf.get("spark.submit.pyFiles") should (startWith("/"))
+      // local py files should not be a URI format.
+      conf.get("spark.submit.pyFiles") should (startWith("/"))
+    }
   }
 
   test("download remote resource if it is not supported by yarn service") {
@@ -1095,18 +1100,13 @@ class SparkSubmitSuite
   }
 
   private def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
-    val tmpDir = Utils.createTempDir()
-
-    val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")
-    val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf), StandardCharsets.UTF_8)
-    for ((key, value) <- defaults) writer.write(s"$key $value\n")
-
-    writer.close()
-
-    try {
+    withTempDir { tmpDir =>
+      val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")
+      val writer =
+        new OutputStreamWriter(new FileOutputStream(defaultsConf), StandardCharsets.UTF_8)
+      for ((key, value) <- defaults) writer.write(s"$key $value\n")
+      writer.close()
       f(tmpDir.getAbsolutePath)
-    } finally {
-      Utils.deleteRecursively(tmpDir)
     }
   }
 
@@ -1134,39 +1134,40 @@ class SparkSubmitSuite
     val hadoopConf = new Configuration()
     updateConfWithFakeS3Fs(hadoopConf)
 
-    val tmpDir = Utils.createTempDir()
-    val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)
+    withTempDir { tmpDir =>
+      val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)
 
-    val args = Seq(
-      "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
-      "--name", "testApp",
-      "--master", "yarn",
-      "--deploy-mode", "client",
-      "--py-files", s"s3a://${pyFile.getAbsolutePath}",
-      "spark-internal"
-    )
+      val args = Seq(
+        "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
+        "--name", "testApp",
+        "--master", "yarn",
+        "--deploy-mode", "client",
+        "--py-files", s"s3a://${pyFile.getAbsolutePath}",
+        "spark-internal"
+      )
 
-    val appArgs = new SparkSubmitArguments(args)
-    val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf))
+      val appArgs = new SparkSubmitArguments(args)
+      val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf))
 
-    conf.get(PY_FILES.key) should be (s"s3a://${pyFile.getAbsolutePath}")
-    conf.get("spark.submit.pyFiles") should (startWith("/"))
+      conf.get(PY_FILES.key) should be(s"s3a://${pyFile.getAbsolutePath}")
+      conf.get("spark.submit.pyFiles") should (startWith("/"))
 
-    // Verify "spark.submit.pyFiles"
-    val args1 = Seq(
-      "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
-      "--name", "testApp",
-      "--master", "yarn",
-      "--deploy-mode", "client",
-      "--conf", s"spark.submit.pyFiles=s3a://${pyFile.getAbsolutePath}",
-      "spark-internal"
-    )
+      // Verify "spark.submit.pyFiles"
+      val args1 = Seq(
+        "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
+        "--name", "testApp",
+        "--master", "yarn",
+        "--deploy-mode", "client",
+        "--conf", s"spark.submit.pyFiles=s3a://${pyFile.getAbsolutePath}",
+        "spark-internal"
+      )
 
-    val appArgs1 = new SparkSubmitArguments(args1)
-    val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1, conf = Some(hadoopConf))
+      val appArgs1 = new SparkSubmitArguments(args1)
+      val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1, conf = Some(hadoopConf))
 
-    conf1.get(PY_FILES.key) should be (s"s3a://${pyFile.getAbsolutePath}")
-    conf1.get("spark.submit.pyFiles") should (startWith("/"))
+      conf1.get(PY_FILES.key) should be(s"s3a://${pyFile.getAbsolutePath}")
+      conf1.get("spark.submit.pyFiles") should (startWith("/"))
+    }
   }
 
   test("handles natural line delimiters in --properties-file and --conf uniformly") {

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 527c654..c1ae27a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -767,53 +767,54 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
   }
 
   test("clean up stale app information") {
-    val storeDir = Utils.createTempDir()
-    val conf = createTestConf().set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
-    val clock = new ManualClock()
-    val provider = spy(new FsHistoryProvider(conf, clock))
-    val appId = "new1"
-
-    // Write logs for two app attempts.
-    clock.advance(1)
-    val attempt1 = newLogFile(appId, Some("1"), inProgress = false)
-    writeFile(attempt1, true, None,
-      SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("1")),
-      SparkListenerJobStart(0, 1L, Nil, null),
-      SparkListenerApplicationEnd(5L)
+    withTempDir { storeDir =>
+      val conf = createTestConf().set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
+      val clock = new ManualClock()
+      val provider = spy(new FsHistoryProvider(conf, clock))
+      val appId = "new1"
+
+      // Write logs for two app attempts.
+      clock.advance(1)
+      val attempt1 = newLogFile(appId, Some("1"), inProgress = false)
+      writeFile(attempt1, true, None,
+        SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("1")),
+        SparkListenerJobStart(0, 1L, Nil, null),
+        SparkListenerApplicationEnd(5L)
       )
-    val attempt2 = newLogFile(appId, Some("2"), inProgress = false)
-    writeFile(attempt2, true, None,
-      SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("2")),
-      SparkListenerJobStart(0, 1L, Nil, null),
-      SparkListenerApplicationEnd(5L)
+      val attempt2 = newLogFile(appId, Some("2"), inProgress = false)
+      writeFile(attempt2, true, None,
+        SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("2")),
+        SparkListenerJobStart(0, 1L, Nil, null),
+        SparkListenerApplicationEnd(5L)
       )
-    updateAndCheck(provider) { list =>
-      assert(list.size === 1)
-      assert(list(0).id === appId)
-      assert(list(0).attempts.size === 2)
-    }
-
-    // Load the app's UI.
-    val ui = provider.getAppUI(appId, Some("1"))
-    assert(ui.isDefined)
-
-    // Delete the underlying log file for attempt 1 and rescan. The UI should go away, but since
-    // attempt 2 still exists, listing data should be there.
-    clock.advance(1)
-    attempt1.delete()
-    updateAndCheck(provider) { list =>
-      assert(list.size === 1)
-      assert(list(0).id === appId)
-      assert(list(0).attempts.size === 1)
-    }
-    assert(!ui.get.valid)
-    assert(provider.getAppUI(appId, None) === None)
+      updateAndCheck(provider) { list =>
+        assert(list.size === 1)
+        assert(list(0).id === appId)
+        assert(list(0).attempts.size === 2)
+      }
 
-    // Delete the second attempt's log file. Now everything should go away.
-    clock.advance(1)
-    attempt2.delete()
-    updateAndCheck(provider) { list =>
-      assert(list.isEmpty)
+      // Load the app's UI.
+      val ui = provider.getAppUI(appId, Some("1"))
+      assert(ui.isDefined)
+
+      // Delete the underlying log file for attempt 1 and rescan. The UI should go away, but since
+      // attempt 2 still exists, listing data should be there.
+      clock.advance(1)
+      attempt1.delete()
+      updateAndCheck(provider) { list =>
+        assert(list.size === 1)
+        assert(list(0).id === appId)
+        assert(list(0).attempts.size === 1)
+      }
+      assert(!ui.get.valid)
+      assert(provider.getAppUI(appId, None) === None)
+
+      // Delete the second attempt's log file. Now everything should go away.
+      clock.advance(1)
+      attempt2.delete()
+      updateAndCheck(provider) { list =>
+        assert(list.isEmpty)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
index 3795482..e89733a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
@@ -41,18 +41,14 @@ class HistoryServerArgumentsSuite extends SparkFunSuite {
   }
 
   test("Properties File Arguments Parsing --properties-file") {
-    val tmpDir = Utils.createTempDir()
-    val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir)
-    try {
+    withTempDir { tmpDir =>
+      val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir)
       Files.write("spark.test.CustomPropertyA blah\n" +
         "spark.test.CustomPropertyB notblah\n", outFile, UTF_8)
       val argStrings = Array("--properties-file", outFile.getAbsolutePath)
       val hsa = new HistoryServerArguments(conf, argStrings)
       assert(conf.get("spark.test.CustomPropertyA") === "blah")
       assert(conf.get("spark.test.CustomPropertyB") === "notblah")
-    } finally {
-      Utils.deleteRecursively(tmpDir)
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index 62fe0ea..3027865 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -31,14 +31,11 @@ import org.apache.spark.util.Utils
 class PersistenceEngineSuite extends SparkFunSuite {
 
   test("FileSystemPersistenceEngine") {
-    val dir = Utils.createTempDir()
-    try {
+    withTempDir { dir =>
       val conf = new SparkConf()
       testPersistenceEngine(conf, serializer =>
         new FileSystemPersistenceEngine(dir.getAbsolutePath, serializer)
       )
-    } finally {
-      Utils.deleteRecursively(dir)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
index 817dc08..576ca161 100644
--- a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
@@ -59,9 +59,7 @@ class WholeTextFileInputFormatSuite extends SparkFunSuite with BeforeAndAfterAll
 
   test("for small files minimum split size per node and per rack should be less than or equal to " +
     "maximum split size.") {
-    var dir : File = null;
-    try {
-      dir = Utils.createTempDir()
+    withTempDir { dir =>
       logInfo(s"Local disk address is ${dir.toString}.")
 
       // Set the minsize per node and rack to be larger than the size of the input file.
@@ -75,8 +73,6 @@ class WholeTextFileInputFormatSuite extends SparkFunSuite with BeforeAndAfterAll
       }
       // ensure spark job runs successfully without exceptions from the CombineFileInputFormat
       assert(sc.wholeTextFiles(dir.toString).count == 3)
-    } finally {
-      Utils.deleteRecursively(dir)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
index ddf73d6..4755291 100644
--- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -89,52 +89,50 @@ class WholeTextFileRecordReaderSuite extends SparkFunSuite with BeforeAndAfterAl
    *   3) Does the contents be the same.
    */
   test("Correctness of WholeTextFileRecordReader.") {
-    val dir = Utils.createTempDir()
-    logInfo(s"Local disk address is ${dir.toString}.")
+    withTempDir { dir =>
+      logInfo(s"Local disk address is ${dir.toString}.")
 
-    WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
-      createNativeFile(dir, filename, contents, false)
-    }
+      WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
+        createNativeFile(dir, filename, contents, false)
+      }
 
-    val res = sc.wholeTextFiles(dir.toString, 3).collect()
+      val res = sc.wholeTextFiles(dir.toString, 3).collect()
 
-    assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
-      "Number of files read out does not fit with the actual value.")
+      assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
+        "Number of files read out does not fit with the actual value.")
 
-    for ((filename, contents) <- res) {
-      val shortName = filename.split('/').last
-      assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
-        s"Missing file name $filename.")
-      assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
-        s"file $filename contents can not match.")
+      for ((filename, contents) <- res) {
+        val shortName = filename.split('/').last
+        assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
+          s"Missing file name $filename.")
+        assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
+          s"file $filename contents can not match.")
+      }
     }
-
-    Utils.deleteRecursively(dir)
   }
 
   test("Correctness of WholeTextFileRecordReader with GzipCodec.") {
-    val dir = Utils.createTempDir()
-    logInfo(s"Local disk address is ${dir.toString}.")
+    withTempDir { dir =>
+      logInfo(s"Local disk address is ${dir.toString}.")
 
-    WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
-      createNativeFile(dir, filename, contents, true)
-    }
+      WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
+        createNativeFile(dir, filename, contents, true)
+      }
 
-    val res = sc.wholeTextFiles(dir.toString, 3).collect()
+      val res = sc.wholeTextFiles(dir.toString, 3).collect()
 
-    assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
-      "Number of files read out does not fit with the actual value.")
+      assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
+        "Number of files read out does not fit with the actual value.")
 
-    for ((filename, contents) <- res) {
-      val shortName = filename.split('/').last.split('.')(0)
+      for ((filename, contents) <- res) {
+        val shortName = filename.split('/').last.split('.')(0)
 
-      assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
-        s"Missing file name $filename.")
-      assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
-        s"file $filename contents can not match.")
+        assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
+          s"Missing file name $filename.")
+        assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
+          s"file $filename contents can not match.")
+      }
     }
-
-    Utils.deleteRecursively(dir)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 0ec359d..945b0944 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -470,15 +470,12 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
   }
 
   test("zero-partition RDD") {
-    val emptyDir = Utils.createTempDir()
-    try {
+    withTempDir { emptyDir =>
       val file = sc.textFile(emptyDir.getAbsolutePath)
       assert(file.partitions.isEmpty)
       assert(file.collect().toList === Nil)
       // Test that a shuffle on the file works, because this used to be a bug
       assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
-    } finally {
-      Utils.deleteRecursively(emptyDir)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index a799b1c..5cb2b56 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -822,63 +822,66 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
   }
 
   test("file server") {
-    val conf = new SparkConf()
-    val tempDir = Utils.createTempDir()
-    val file = new File(tempDir, "file")
-    Files.write(UUID.randomUUID().toString(), file, UTF_8)
-    val fileWithSpecialChars = new File(tempDir, "file name")
-    Files.write(UUID.randomUUID().toString(), fileWithSpecialChars, UTF_8)
-    val empty = new File(tempDir, "empty")
-    Files.write("", empty, UTF_8);
-    val jar = new File(tempDir, "jar")
-    Files.write(UUID.randomUUID().toString(), jar, UTF_8)
-
-    val dir1 = new File(tempDir, "dir1")
-    assert(dir1.mkdir())
-    val subFile1 = new File(dir1, "file1")
-    Files.write(UUID.randomUUID().toString(), subFile1, UTF_8)
-
-    val dir2 = new File(tempDir, "dir2")
-    assert(dir2.mkdir())
-    val subFile2 = new File(dir2, "file2")
-    Files.write(UUID.randomUUID().toString(), subFile2, UTF_8)
-
-    val fileUri = env.fileServer.addFile(file)
-    val fileWithSpecialCharsUri = env.fileServer.addFile(fileWithSpecialChars)
-    val emptyUri = env.fileServer.addFile(empty)
-    val jarUri = env.fileServer.addJar(jar)
-    val dir1Uri = env.fileServer.addDirectory("/dir1", dir1)
-    val dir2Uri = env.fileServer.addDirectory("/dir2", dir2)
-
-    // Try registering directories with invalid names.
-    Seq("/files", "/jars").foreach { uri =>
-      intercept[IllegalArgumentException] {
-        env.fileServer.addDirectory(uri, dir1)
-      }
-    }
+    withTempDir { tempDir =>
+      withTempDir { destDir =>
+        val conf = new SparkConf()
+
+        val file = new File(tempDir, "file")
+        Files.write(UUID.randomUUID().toString(), file, UTF_8)
+        val fileWithSpecialChars = new File(tempDir, "file name")
+        Files.write(UUID.randomUUID().toString(), fileWithSpecialChars, UTF_8)
+        val empty = new File(tempDir, "empty")
+        Files.write("", empty, UTF_8);
+        val jar = new File(tempDir, "jar")
+        Files.write(UUID.randomUUID().toString(), jar, UTF_8)
+
+        val dir1 = new File(tempDir, "dir1")
+        assert(dir1.mkdir())
+        val subFile1 = new File(dir1, "file1")
+        Files.write(UUID.randomUUID().toString(), subFile1, UTF_8)
+
+        val dir2 = new File(tempDir, "dir2")
+        assert(dir2.mkdir())
+        val subFile2 = new File(dir2, "file2")
+        Files.write(UUID.randomUUID().toString(), subFile2, UTF_8)
+
+        val fileUri = env.fileServer.addFile(file)
+        val fileWithSpecialCharsUri = env.fileServer.addFile(fileWithSpecialChars)
+        val emptyUri = env.fileServer.addFile(empty)
+        val jarUri = env.fileServer.addJar(jar)
+        val dir1Uri = env.fileServer.addDirectory("/dir1", dir1)
+        val dir2Uri = env.fileServer.addDirectory("/dir2", dir2)
+
+        // Try registering directories with invalid names.
+        Seq("/files", "/jars").foreach { uri =>
+          intercept[IllegalArgumentException] {
+            env.fileServer.addDirectory(uri, dir1)
+          }
+        }
 
-    val destDir = Utils.createTempDir()
-    val sm = new SecurityManager(conf)
-    val hc = SparkHadoopUtil.get.conf
-
-    val files = Seq(
-      (file, fileUri),
-      (fileWithSpecialChars, fileWithSpecialCharsUri),
-      (empty, emptyUri),
-      (jar, jarUri),
-      (subFile1, dir1Uri + "/file1"),
-      (subFile2, dir2Uri + "/file2"))
-    files.foreach { case (f, uri) =>
-      val destFile = new File(destDir, f.getName())
-      Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
-      assert(Files.equal(f, destFile))
-    }
+        val sm = new SecurityManager(conf)
+        val hc = SparkHadoopUtil.get.conf
+
+        val files = Seq(
+          (file, fileUri),
+          (fileWithSpecialChars, fileWithSpecialCharsUri),
+          (empty, emptyUri),
+          (jar, jarUri),
+          (subFile1, dir1Uri + "/file1"),
+          (subFile2, dir2Uri + "/file2"))
+        files.foreach { case (f, uri) =>
+          val destFile = new File(destDir, f.getName())
+          Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
+          assert(Files.equal(f, destFile))
+        }
 
-    // Try to download files that do not exist.
-    Seq("files", "jars", "dir1").foreach { root =>
-      intercept[Exception] {
-        val uri = env.address.toSparkURL + s"/$root/doesNotExist"
-        Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
+        // Try to download files that do not exist.
+        Seq("files", "jars", "dir1").foreach { root =>
+          intercept[Exception] {
+            val uri = env.address.toSparkURL + s"/$root/doesNotExist"
+            Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
+          }
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 5f4ffa1..ed6a3d9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -2831,18 +2831,22 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
   }
 
   test("SPARK-23207: reliable checkpoint can avoid rollback (checkpointed before)") {
-    sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
-    val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true)
-    shuffleMapRdd.checkpoint()
-    shuffleMapRdd.doCheckpoint()
-    assertResultStageNotRollbacked(shuffleMapRdd)
+    withTempDir { dir =>
+      sc.setCheckpointDir(dir.getCanonicalPath)
+      val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true)
+      shuffleMapRdd.checkpoint()
+      shuffleMapRdd.doCheckpoint()
+      assertResultStageNotRollbacked(shuffleMapRdd)
+    }
   }
 
   test("SPARK-23207: reliable checkpoint fail to rollback (checkpointing now)") {
-    sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
-    val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true)
-    shuffleMapRdd.checkpoint()
-    assertResultStageFailToRollback(shuffleMapRdd)
+    withTempDir { dir =>
+      sc.setCheckpointDir(dir.getCanonicalPath)
+      val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true)
+      shuffleMapRdd.checkpoint()
+      assertResultStageFailToRollback(shuffleMapRdd)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
index d6ff5bb..848f702 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
@@ -49,11 +49,8 @@ class OutputCommitCoordinatorIntegrationSuite
   test("exception thrown in OutputCommitter.commitTask()") {
     // Regression test for SPARK-10381
     failAfter(Span(60, Seconds)) {
-      val tempDir = Utils.createTempDir()
-      try {
+      withTempDir { tempDir =>
         sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
-      } finally {
-        Utils.deleteRecursively(tempDir)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index a7eed4b..467e490 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -369,27 +369,27 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
   }
 
   test("SPARK-12222: deserialize RoaringBitmap throw Buffer underflow exception") {
-    val dir = Utils.createTempDir()
-    val tmpfile = dir.toString + "/RoaringBitmap"
-    val outStream = new FileOutputStream(tmpfile)
-    val output = new KryoOutput(outStream)
-    val bitmap = new RoaringBitmap
-    bitmap.add(1)
-    bitmap.add(3)
-    bitmap.add(5)
-    // Ignore Kryo because it doesn't use writeObject
-    bitmap.serialize(new KryoOutputObjectOutputBridge(null, output))
-    output.flush()
-    output.close()
-
-    val inStream = new FileInputStream(tmpfile)
-    val input = new KryoInput(inStream)
-    val ret = new RoaringBitmap
-    // Ignore Kryo because it doesn't use readObject
-    ret.deserialize(new KryoInputObjectInputBridge(null, input))
-    input.close()
-    assert(ret == bitmap)
-    Utils.deleteRecursively(dir)
+    withTempDir { dir =>
+      val tmpfile = dir.toString + "/RoaringBitmap"
+      val outStream = new FileOutputStream(tmpfile)
+      val output = new KryoOutput(outStream)
+      val bitmap = new RoaringBitmap
+      bitmap.add(1)
+      bitmap.add(3)
+      bitmap.add(5)
+      // Ignore Kryo because it doesn't use writeObject
+      bitmap.serialize(new KryoOutputObjectOutputBridge(null, output))
+      output.flush()
+      output.close()
+
+      val inStream = new FileInputStream(tmpfile)
+      val input = new KryoInput(inStream)
+      val ret = new RoaringBitmap
+      // Ignore Kryo because it doesn't use readObject
+      ret.deserialize(new KryoInputObjectInputBridge(null, input))
+      input.close()
+      assert(ret == bitmap)
+    }
   }
 
   test("KryoOutputObjectOutputBridge.writeObject and KryoInputObjectInputBridge.readObject") {

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
index eec961a..959cf58 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
@@ -132,7 +132,6 @@ class DiskStoreSuite extends SparkFunSuite {
   }
 
   test("block data encryption") {
-    val testDir = Utils.createTempDir()
     val testData = new Array[Byte](128 * 1024)
     new Random().nextBytes(testData)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala
index f9e1b79..e48f001 100644
--- a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala
@@ -50,34 +50,34 @@ class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext
   }
 
   test("Checkpointing") {
-    val tempDir = Utils.createTempDir()
-    val path = tempDir.toURI.toString
-    val checkpointInterval = 2
-    var rddsToCheck = Seq.empty[RDDToCheck]
-    sc.setCheckpointDir(path)
-    val rdd1 = createRDD(sc)
-    val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, rdd1.sparkContext)
-    checkpointer.update(rdd1)
-    rdd1.count()
-    rddsToCheck = rddsToCheck :+ RDDToCheck(rdd1, 1)
-    checkCheckpoint(rddsToCheck, 1, checkpointInterval)
-
-    var iteration = 2
-    while (iteration < 9) {
-      val rdd = createRDD(sc)
-      checkpointer.update(rdd)
-      rdd.count()
-      rddsToCheck = rddsToCheck :+ RDDToCheck(rdd, iteration)
-      checkCheckpoint(rddsToCheck, iteration, checkpointInterval)
-      iteration += 1
-    }
+    withTempDir { tempDir =>
+      val path = tempDir.toURI.toString
+      val checkpointInterval = 2
+      var rddsToCheck = Seq.empty[RDDToCheck]
+      sc.setCheckpointDir(path)
+      val rdd1 = createRDD(sc)
+      val checkpointer =
+        new PeriodicRDDCheckpointer[Double](checkpointInterval, rdd1.sparkContext)
+      checkpointer.update(rdd1)
+      rdd1.count()
+      rddsToCheck = rddsToCheck :+ RDDToCheck(rdd1, 1)
+      checkCheckpoint(rddsToCheck, 1, checkpointInterval)
+
+      var iteration = 2
+      while (iteration < 9) {
+        val rdd = createRDD(sc)
+        checkpointer.update(rdd)
+        rdd.count()
+        rddsToCheck = rddsToCheck :+ RDDToCheck(rdd, iteration)
+        checkCheckpoint(rddsToCheck, iteration, checkpointInterval)
+        iteration += 1
+      }
 
-    checkpointer.deleteAllCheckpoints()
-    rddsToCheck.foreach { rdd =>
-      confirmCheckpointRemoved(rdd.rdd)
+      checkpointer.deleteAllCheckpoints()
+      rddsToCheck.foreach { rdd =>
+        confirmCheckpointRemoved(rdd.rdd)
+      }
     }
-
-    Utils.deleteRecursively(tempDir)
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org