You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/12/01 08:34:20 UTC
[1/2] spark git commit: [SPARK-26180][CORE][TEST] Reuse withTempDir
function to the SparkCore test case
Repository: spark
Updated Branches:
refs/heads/master 2f6e88fec -> 327ac83f5
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index f5e912b..901a724 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -295,31 +295,30 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
private val workerConf = new SparkConf()
def testOffsetBytes(isCompressed: Boolean): Unit = {
- val tmpDir2 = Utils.createTempDir()
- val suffix = getSuffix(isCompressed)
- val f1Path = tmpDir2 + "/f1" + suffix
- writeLogFile(f1Path, "1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8))
- val f1Length = Utils.getFileLength(new File(f1Path), workerConf)
+ withTempDir { tmpDir2 =>
+ val suffix = getSuffix(isCompressed)
+ val f1Path = tmpDir2 + "/f1" + suffix
+ writeLogFile(f1Path, "1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8))
+ val f1Length = Utils.getFileLength(new File(f1Path), workerConf)
- // Read first few bytes
- assert(Utils.offsetBytes(f1Path, f1Length, 0, 5) === "1\n2\n3")
+ // Read first few bytes
+ assert(Utils.offsetBytes(f1Path, f1Length, 0, 5) === "1\n2\n3")
- // Read some middle bytes
- assert(Utils.offsetBytes(f1Path, f1Length, 4, 11) === "3\n4\n5\n6")
+ // Read some middle bytes
+ assert(Utils.offsetBytes(f1Path, f1Length, 4, 11) === "3\n4\n5\n6")
- // Read last few bytes
- assert(Utils.offsetBytes(f1Path, f1Length, 12, 18) === "7\n8\n9\n")
+ // Read last few bytes
+ assert(Utils.offsetBytes(f1Path, f1Length, 12, 18) === "7\n8\n9\n")
- // Read some nonexistent bytes in the beginning
- assert(Utils.offsetBytes(f1Path, f1Length, -5, 5) === "1\n2\n3")
+ // Read some nonexistent bytes in the beginning
+ assert(Utils.offsetBytes(f1Path, f1Length, -5, 5) === "1\n2\n3")
- // Read some nonexistent bytes at the end
- assert(Utils.offsetBytes(f1Path, f1Length, 12, 22) === "7\n8\n9\n")
+ // Read some nonexistent bytes at the end
+ assert(Utils.offsetBytes(f1Path, f1Length, 12, 22) === "7\n8\n9\n")
- // Read some nonexistent bytes on both ends
- assert(Utils.offsetBytes(f1Path, f1Length, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
-
- Utils.deleteRecursively(tmpDir2)
+ // Read some nonexistent bytes on both ends
+ assert(Utils.offsetBytes(f1Path, f1Length, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
+ }
}
test("reading offset bytes of a file") {
@@ -331,41 +330,41 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
}
def testOffsetBytesMultipleFiles(isCompressed: Boolean): Unit = {
- val tmpDir = Utils.createTempDir()
- val suffix = getSuffix(isCompressed)
- val files = (1 to 3).map(i => new File(tmpDir, i.toString + suffix)) :+ new File(tmpDir, "4")
- writeLogFile(files(0).getAbsolutePath, "0123456789".getBytes(StandardCharsets.UTF_8))
- writeLogFile(files(1).getAbsolutePath, "abcdefghij".getBytes(StandardCharsets.UTF_8))
- writeLogFile(files(2).getAbsolutePath, "ABCDEFGHIJ".getBytes(StandardCharsets.UTF_8))
- writeLogFile(files(3).getAbsolutePath, "9876543210".getBytes(StandardCharsets.UTF_8))
- val fileLengths = files.map(Utils.getFileLength(_, workerConf))
-
- // Read first few bytes in the 1st file
- assert(Utils.offsetBytes(files, fileLengths, 0, 5) === "01234")
+ withTempDir { tmpDir =>
+ val suffix = getSuffix(isCompressed)
+ val files = (1 to 3).map(i =>
+ new File(tmpDir, i.toString + suffix)) :+ new File(tmpDir, "4")
+ writeLogFile(files(0).getAbsolutePath, "0123456789".getBytes(StandardCharsets.UTF_8))
+ writeLogFile(files(1).getAbsolutePath, "abcdefghij".getBytes(StandardCharsets.UTF_8))
+ writeLogFile(files(2).getAbsolutePath, "ABCDEFGHIJ".getBytes(StandardCharsets.UTF_8))
+ writeLogFile(files(3).getAbsolutePath, "9876543210".getBytes(StandardCharsets.UTF_8))
+ val fileLengths = files.map(Utils.getFileLength(_, workerConf))
- // Read bytes within the 1st file
- assert(Utils.offsetBytes(files, fileLengths, 5, 8) === "567")
+ // Read first few bytes in the 1st file
+ assert(Utils.offsetBytes(files, fileLengths, 0, 5) === "01234")
- // Read bytes across 1st and 2nd file
- assert(Utils.offsetBytes(files, fileLengths, 8, 18) === "89abcdefgh")
+ // Read bytes within the 1st file
+ assert(Utils.offsetBytes(files, fileLengths, 5, 8) === "567")
- // Read bytes across 1st, 2nd and 3rd file
- assert(Utils.offsetBytes(files, fileLengths, 5, 24) === "56789abcdefghijABCD")
+ // Read bytes across 1st and 2nd file
+ assert(Utils.offsetBytes(files, fileLengths, 8, 18) === "89abcdefgh")
- // Read bytes across 3rd and 4th file
- assert(Utils.offsetBytes(files, fileLengths, 25, 35) === "FGHIJ98765")
+ // Read bytes across 1st, 2nd and 3rd file
+ assert(Utils.offsetBytes(files, fileLengths, 5, 24) === "56789abcdefghijABCD")
- // Read some nonexistent bytes in the beginning
- assert(Utils.offsetBytes(files, fileLengths, -5, 18) === "0123456789abcdefgh")
+ // Read bytes across 3rd and 4th file
+ assert(Utils.offsetBytes(files, fileLengths, 25, 35) === "FGHIJ98765")
- // Read some nonexistent bytes at the end
- assert(Utils.offsetBytes(files, fileLengths, 18, 45) === "ijABCDEFGHIJ9876543210")
+ // Read some nonexistent bytes in the beginning
+ assert(Utils.offsetBytes(files, fileLengths, -5, 18) === "0123456789abcdefgh")
- // Read some nonexistent bytes on both ends
- assert(Utils.offsetBytes(files, fileLengths, -5, 45) ===
- "0123456789abcdefghijABCDEFGHIJ9876543210")
+ // Read some nonexistent bytes at the end
+ assert(Utils.offsetBytes(files, fileLengths, 18, 45) === "ijABCDEFGHIJ9876543210")
- Utils.deleteRecursively(tmpDir)
+ // Read some nonexistent bytes on both ends
+ assert(Utils.offsetBytes(files, fileLengths, -5, 45) ===
+ "0123456789abcdefghijABCDEFGHIJ9876543210")
+ }
}
test("reading offset bytes across multiple files") {
@@ -427,27 +426,28 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
test("doesDirectoryContainFilesNewerThan") {
// create some temporary directories and files
- val parent: File = Utils.createTempDir()
- // The parent directory has two child directories
- val child1: File = Utils.createTempDir(parent.getCanonicalPath)
- val child2: File = Utils.createTempDir(parent.getCanonicalPath)
- val child3: File = Utils.createTempDir(child1.getCanonicalPath)
- // set the last modified time of child1 to 30 secs old
- child1.setLastModified(System.currentTimeMillis() - (1000 * 30))
-
- // although child1 is old, child2 is still new so return true
- assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
-
- child2.setLastModified(System.currentTimeMillis - (1000 * 30))
- assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
-
- parent.setLastModified(System.currentTimeMillis - (1000 * 30))
- // although parent and its immediate children are new, child3 is still old
- // we expect a full recursive search for new files.
- assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
-
- child3.setLastModified(System.currentTimeMillis - (1000 * 30))
- assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+ withTempDir { parent =>
+ // The parent directory has two child directories
+ val child1: File = Utils.createTempDir(parent.getCanonicalPath)
+ val child2: File = Utils.createTempDir(parent.getCanonicalPath)
+ val child3: File = Utils.createTempDir(child1.getCanonicalPath)
+ // set the last modified time of child1 to 30 secs old
+ child1.setLastModified(System.currentTimeMillis() - (1000 * 30))
+
+ // although child1 is old, child2 is still new so return true
+ assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+
+ child2.setLastModified(System.currentTimeMillis - (1000 * 30))
+ assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+
+ parent.setLastModified(System.currentTimeMillis - (1000 * 30))
+ // although parent and its immediate children are new, child3 is still old
+ // we expect a full recursive search for new files.
+ assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+
+ child3.setLastModified(System.currentTimeMillis - (1000 * 30))
+ assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+ }
}
test("resolveURI") {
@@ -608,9 +608,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
}
test("loading properties from file") {
- val tmpDir = Utils.createTempDir()
- val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir)
- try {
+ withTempDir { tmpDir =>
+ val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir)
System.setProperty("spark.test.fileNameLoadB", "2")
Files.write("spark.test.fileNameLoadA true\n" +
"spark.test.fileNameLoadB 1\n", outFile, StandardCharsets.UTF_8)
@@ -621,8 +620,6 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
val sparkConf = new SparkConf
assert(sparkConf.getBoolean("spark.test.fileNameLoadA", false) === true)
assert(sparkConf.getInt("spark.test.fileNameLoadB", 1) === 2)
- } finally {
- Utils.deleteRecursively(tmpDir)
}
}
@@ -638,52 +635,53 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
}
test("fetch hcfs dir") {
- val tempDir = Utils.createTempDir()
- val sourceDir = new File(tempDir, "source-dir")
- sourceDir.mkdir()
- val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath)
- val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir)
- val targetDir = new File(tempDir, "target-dir")
- Files.write("some text", sourceFile, StandardCharsets.UTF_8)
-
- val path =
- if (Utils.isWindows) {
- new Path("file:/" + sourceDir.getAbsolutePath.replace("\\", "/"))
- } else {
- new Path("file://" + sourceDir.getAbsolutePath)
- }
- val conf = new Configuration()
- val fs = Utils.getHadoopFileSystem(path.toString, conf)
+ withTempDir { tempDir =>
+ val sourceDir = new File(tempDir, "source-dir")
+ sourceDir.mkdir()
+ val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath)
+ val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir)
+ val targetDir = new File(tempDir, "target-dir")
+ Files.write("some text", sourceFile, StandardCharsets.UTF_8)
+
+ val path =
+ if (Utils.isWindows) {
+ new Path("file:/" + sourceDir.getAbsolutePath.replace("\\", "/"))
+ } else {
+ new Path("file://" + sourceDir.getAbsolutePath)
+ }
+ val conf = new Configuration()
+ val fs = Utils.getHadoopFileSystem(path.toString, conf)
- assert(!targetDir.isDirectory())
- Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
- assert(targetDir.isDirectory())
+ assert(!targetDir.isDirectory())
+ Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
+ assert(targetDir.isDirectory())
- // Copy again to make sure it doesn't error if the dir already exists.
- Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
+ // Copy again to make sure it doesn't error if the dir already exists.
+ Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
- val destDir = new File(targetDir, sourceDir.getName())
- assert(destDir.isDirectory())
+ val destDir = new File(targetDir, sourceDir.getName())
+ assert(destDir.isDirectory())
- val destInnerDir = new File(destDir, innerSourceDir.getName)
- assert(destInnerDir.isDirectory())
+ val destInnerDir = new File(destDir, innerSourceDir.getName)
+ assert(destInnerDir.isDirectory())
- val destInnerFile = new File(destInnerDir, sourceFile.getName)
- assert(destInnerFile.isFile())
+ val destInnerFile = new File(destInnerDir, sourceFile.getName)
+ assert(destInnerFile.isFile())
- val filePath =
- if (Utils.isWindows) {
- new Path("file:/" + sourceFile.getAbsolutePath.replace("\\", "/"))
- } else {
- new Path("file://" + sourceFile.getAbsolutePath)
- }
- val testFileDir = new File(tempDir, "test-filename")
- val testFileName = "testFName"
- val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
- Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),
- conf, false, Some(testFileName))
- val newFileName = new File(testFileDir, testFileName)
- assert(newFileName.isFile())
+ val filePath =
+ if (Utils.isWindows) {
+ new Path("file:/" + sourceFile.getAbsolutePath.replace("\\", "/"))
+ } else {
+ new Path("file://" + sourceFile.getAbsolutePath)
+ }
+ val testFileDir = new File(tempDir, "test-filename")
+ val testFileName = "testFName"
+ val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
+ Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),
+ conf, false, Some(testFileName))
+ val newFileName = new File(testFileDir, testFileName)
+ assert(newFileName.isFile())
+ }
}
test("shutdown hook manager") {
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 2341949..85963ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -67,6 +67,17 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
}
/**
+ * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
+ * returns.
+ */
+ protected override def withTempDir(f: File => Unit): Unit = {
+ super.withTempDir { dir =>
+ f(dir)
+ waitForTasksToFinish()
+ }
+ }
+
+ /**
* A helper function for turning off/on codegen.
*/
protected def testWithWholeStageCodegenOnAndOff(testName: String)(f: String => Unit): Unit = {
@@ -143,43 +154,6 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
test(name) { runOnThread() }
}
}
-}
-
-/**
- * Helper trait that can be extended by all external SQL test suites.
- *
- * This allows subclasses to plugin a custom `SQLContext`.
- * To use implicit methods, import `testImplicits._` instead of through the `SQLContext`.
- *
- * Subclasses should *not* create `SQLContext`s in the test suite constructor, which is
- * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM.
- */
-private[sql] trait SQLTestUtilsBase
- extends Eventually
- with BeforeAndAfterAll
- with SQLTestData
- with PlanTestBase { self: Suite =>
-
- protected def sparkContext = spark.sparkContext
-
- // Shorthand for running a query using our SQLContext
- protected lazy val sql = spark.sql _
-
- /**
- * A helper object for importing SQL implicits.
- *
- * Note that the alternative of importing `spark.implicits._` is not possible here.
- * This is because we create the `SQLContext` immediately before the first test is run,
- * but the implicits import is needed in the constructor.
- */
- protected object testImplicits extends SQLImplicits {
- protected override def _sqlContext: SQLContext = self.spark.sqlContext
- }
-
- protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
- SparkSession.setActiveSession(spark)
- super.withSQLConf(pairs: _*)(f)
- }
/**
* Copy file in jar's resource to a temp file, then pass it to `f`.
@@ -207,21 +181,6 @@ private[sql] trait SQLTestUtilsBase
}
/**
- * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
- * returns.
- *
- * @todo Probably this method should be moved to a more general place
- */
- protected def withTempDir(f: File => Unit): Unit = {
- val dir = Utils.createTempDir().getCanonicalFile
- try f(dir) finally {
- // wait for all tasks to finish before deleting files
- waitForTasksToFinish()
- Utils.deleteRecursively(dir)
- }
- }
-
- /**
* Creates the specified number of temporary directories, which is then passed to `f` and will be
* deleted after `f` returns.
*/
@@ -233,6 +192,43 @@ private[sql] trait SQLTestUtilsBase
files.foreach(Utils.deleteRecursively)
}
}
+}
+
+/**
+ * Helper trait that can be extended by all external SQL test suites.
+ *
+ * This allows subclasses to plugin a custom `SQLContext`.
+ * To use implicit methods, import `testImplicits._` instead of through the `SQLContext`.
+ *
+ * Subclasses should *not* create `SQLContext`s in the test suite constructor, which is
+ * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM.
+ */
+private[sql] trait SQLTestUtilsBase
+ extends Eventually
+ with BeforeAndAfterAll
+ with SQLTestData
+ with PlanTestBase { self: Suite =>
+
+ protected def sparkContext = spark.sparkContext
+
+ // Shorthand for running a query using our SQLContext
+ protected lazy val sql = spark.sql _
+
+ /**
+ * A helper object for importing SQL implicits.
+ *
+ * Note that the alternative of importing `spark.implicits._` is not possible here.
+ * This is because we create the `SQLContext` immediately before the first test is run,
+ * but the implicits import is needed in the constructor.
+ */
+ protected object testImplicits extends SQLImplicits {
+ protected override def _sqlContext: SQLContext = self.spark.sqlContext
+ }
+
+ protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+ SparkSession.setActiveSession(spark)
+ super.withSQLConf(pairs: _*)(f)
+ }
/**
* Drops functions after calling `f`. A function is represented by (functionName, isTemporary).
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index dc96ec4..218bd18 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -56,15 +56,6 @@ class VersionsSuite extends SparkFunSuite with Logging {
import HiveClientBuilder.buildClient
/**
- * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
- * returns.
- */
- protected def withTempDir(f: File => Unit): Unit = {
- val dir = Utils.createTempDir().getCanonicalFile
- try f(dir) finally Utils.deleteRecursively(dir)
- }
-
- /**
* Drops table `tableName` after calling `f`.
*/
protected def withTable(tableNames: String*)(f: => Unit): Unit = {
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index ada494e..6a0f523 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -557,16 +557,4 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
verifyOutput[W](output.toSeq, expectedOutput, useSet)
}
}
-
- /**
- * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
- * returns.
- * (originally from `SqlTestUtils`.)
- * @todo Probably this method should be moved to a more general place
- */
- protected def withTempDir(f: File => Unit): Unit = {
- val dir = Utils.createTempDir().getCanonicalFile
- try f(dir) finally Utils.deleteRecursively(dir)
- }
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org
[2/2] spark git commit: [SPARK-26180][CORE][TEST] Reuse withTempDir
function to the SparkCore test case
Posted by gu...@apache.org.
[SPARK-26180][CORE][TEST] Reuse withTempDir function to the SparkCore test case
## What changes were proposed in this pull request?
Currently, the common `withTempDir` function is used in Spark SQL test cases. To handle `val dir = Utils. createTempDir()` and `Utils. deleteRecursively (dir)`. Unfortunately, the `withTempDir` function cannot be used in the Spark Core test case. This PR Sharing `withTempDir` function in Spark Sql and SparkCore to clean up SparkCore test cases. thanks.
## How was this patch tested?
N / A
Closes #23151 from heary-cao/withCreateTempDir.
Authored-by: caoxuewen <ca...@zte.com.cn>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/327ac83f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/327ac83f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/327ac83f
Branch: refs/heads/master
Commit: 327ac83f5cf33c84775a95442862bea56d8a0005
Parents: 2f6e88f
Author: caoxuewen <ca...@zte.com.cn>
Authored: Sat Dec 1 16:34:11 2018 +0800
Committer: Hyukjin Kwon <gu...@apache.org>
Committed: Sat Dec 1 16:34:11 2018 +0800
----------------------------------------------------------------------
.../org/apache/spark/CheckpointSuite.scala | 5 +-
.../org/apache/spark/ContextCleanerSuite.scala | 97 ++--
.../test/scala/org/apache/spark/FileSuite.scala | 19 +-
.../org/apache/spark/SparkContextSuite.scala | 315 +++++------
.../scala/org/apache/spark/SparkFunSuite.scala | 12 +-
.../spark/api/python/PythonBroadcastSuite.scala | 5 +-
.../apache/spark/deploy/SparkSubmitSuite.scala | 539 ++++++++++---------
.../deploy/history/FsHistoryProviderSuite.scala | 89 +--
.../history/HistoryServerArgumentsSuite.scala | 8 +-
.../deploy/master/PersistenceEngineSuite.scala | 5 +-
.../input/WholeTextFileInputFormatSuite.scala | 6 +-
.../input/WholeTextFileRecordReaderSuite.scala | 62 ++-
.../spark/rdd/PairRDDFunctionsSuite.scala | 5 +-
.../org/apache/spark/rpc/RpcEnvSuite.scala | 113 ++--
.../spark/scheduler/DAGSchedulerSuite.scala | 22 +-
...utputCommitCoordinatorIntegrationSuite.scala | 5 +-
.../spark/serializer/KryoSerializerSuite.scala | 42 +-
.../apache/spark/storage/DiskStoreSuite.scala | 1 -
.../util/PeriodicRDDCheckpointerSuite.scala | 52 +-
.../org/apache/spark/util/UtilsSuite.scala | 222 ++++----
.../apache/spark/sql/test/SQLTestUtils.scala | 100 ++--
.../spark/sql/hive/client/VersionsSuite.scala | 9 -
.../apache/spark/streaming/TestSuiteBase.scala | 12 -
23 files changed, 858 insertions(+), 887 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 48408cc..6d9e47c 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -586,8 +586,7 @@ object CheckpointSuite {
class CheckpointCompressionSuite extends SparkFunSuite with LocalSparkContext {
test("checkpoint compression") {
- val checkpointDir = Utils.createTempDir()
- try {
+ withTempDir { checkpointDir =>
val conf = new SparkConf()
.set("spark.checkpoint.compress", "true")
.set("spark.ui.enabled", "false")
@@ -616,8 +615,6 @@ class CheckpointCompressionSuite extends SparkFunSuite with LocalSparkContext {
// Verify that the compressed content can be read back
assert(rdd.collect().toSeq === (1 to 20))
- } finally {
- Utils.deleteRecursively(checkpointDir)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 6724af9..1fcc975 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -207,54 +207,55 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
}
test("automatically cleanup normal checkpoint") {
- val checkpointDir = Utils.createTempDir()
- checkpointDir.delete()
- var rdd = newPairRDD()
- sc.setCheckpointDir(checkpointDir.toString)
- rdd.checkpoint()
- rdd.cache()
- rdd.collect()
- var rddId = rdd.id
-
- // Confirm the checkpoint directory exists
- assert(ReliableRDDCheckpointData.checkpointPath(sc, rddId).isDefined)
- val path = ReliableRDDCheckpointData.checkpointPath(sc, rddId).get
- val fs = path.getFileSystem(sc.hadoopConfiguration)
- assert(fs.exists(path))
-
- // the checkpoint is not cleaned by default (without the configuration set)
- var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
- rdd = null // Make RDD out of scope, ok if collected earlier
- runGC()
- postGCTester.assertCleanup()
- assert(!fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
-
- // Verify that checkpoints are NOT cleaned up if the config is not enabled
- sc.stop()
- val conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("cleanupCheckpoint")
- .set("spark.cleaner.referenceTracking.cleanCheckpoints", "false")
- sc = new SparkContext(conf)
- rdd = newPairRDD()
- sc.setCheckpointDir(checkpointDir.toString)
- rdd.checkpoint()
- rdd.cache()
- rdd.collect()
- rddId = rdd.id
-
- // Confirm the checkpoint directory exists
- assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
-
- // Reference rdd to defeat any early collection by the JVM
- rdd.count()
-
- // Test that GC causes checkpoint data cleanup after dereferencing the RDD
- postGCTester = new CleanerTester(sc, Seq(rddId))
- rdd = null // Make RDD out of scope
- runGC()
- postGCTester.assertCleanup()
- assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
+ withTempDir { checkpointDir =>
+ checkpointDir.delete()
+ var rdd = newPairRDD()
+ sc.setCheckpointDir(checkpointDir.toString)
+ rdd.checkpoint()
+ rdd.cache()
+ rdd.collect()
+ var rddId = rdd.id
+
+ // Confirm the checkpoint directory exists
+ assert(ReliableRDDCheckpointData.checkpointPath(sc, rddId).isDefined)
+ val path = ReliableRDDCheckpointData.checkpointPath(sc, rddId).get
+ val fs = path.getFileSystem(sc.hadoopConfiguration)
+ assert(fs.exists(path))
+
+ // the checkpoint is not cleaned by default (without the configuration set)
+ var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
+ rdd = null // Make RDD out of scope, ok if collected earlier
+ runGC()
+ postGCTester.assertCleanup()
+ assert(!fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
+
+ // Verify that checkpoints are NOT cleaned up if the config is not enabled
+ sc.stop()
+ val conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("cleanupCheckpoint")
+ .set("spark.cleaner.referenceTracking.cleanCheckpoints", "false")
+ sc = new SparkContext(conf)
+ rdd = newPairRDD()
+ sc.setCheckpointDir(checkpointDir.toString)
+ rdd.checkpoint()
+ rdd.cache()
+ rdd.collect()
+ rddId = rdd.id
+
+ // Confirm the checkpoint directory exists
+ assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
+
+ // Reference rdd to defeat any early collection by the JVM
+ rdd.count()
+
+ // Test that GC causes checkpoint data cleanup after dereferencing the RDD
+ postGCTester = new CleanerTester(sc, Seq(rddId))
+ rdd = null // Make RDD out of scope
+ runGC()
+ postGCTester.assertCleanup()
+ assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
+ }
}
test("automatically clean up local checkpoint") {
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index df04a5e..983a791 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -306,17 +306,18 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
.set("spark.files.openCostInBytes", "0")
.set("spark.default.parallelism", "1"))
- val tempDir = Utils.createTempDir()
- val tempDirPath = tempDir.getAbsolutePath
+ withTempDir { tempDir =>
+ val tempDirPath = tempDir.getAbsolutePath
- for (i <- 0 until 8) {
- val tempFile = new File(tempDir, s"part-0000$i")
- Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", tempFile,
- StandardCharsets.UTF_8)
- }
+ for (i <- 0 until 8) {
+ val tempFile = new File(tempDir, s"part-0000$i")
+ Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", tempFile,
+ StandardCharsets.UTF_8)
+ }
- for (p <- Seq(1, 2, 8)) {
- assert(sc.binaryFiles(tempDirPath, minPartitions = p).getNumPartitions === p)
+ for (p <- Seq(1, 2, 8)) {
+ assert(sc.binaryFiles(tempDirPath, minPartitions = p).getNumPartitions === p)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 79192f3..ec4c7ef 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -116,56 +116,57 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
test("basic case for addFile and listFiles") {
- val dir = Utils.createTempDir()
-
- val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
- val absolutePath1 = file1.getAbsolutePath
-
- val file2 = File.createTempFile("someprefix2", "somesuffix2", dir)
- val relativePath = file2.getParent + "/../" + file2.getParentFile.getName + "/" + file2.getName
- val absolutePath2 = file2.getAbsolutePath
-
- try {
- Files.write("somewords1", file1, StandardCharsets.UTF_8)
- Files.write("somewords2", file2, StandardCharsets.UTF_8)
- val length1 = file1.length()
- val length2 = file2.length()
-
- sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
- sc.addFile(file1.getAbsolutePath)
- sc.addFile(relativePath)
- sc.parallelize(Array(1), 1).map(x => {
- val gotten1 = new File(SparkFiles.get(file1.getName))
- val gotten2 = new File(SparkFiles.get(file2.getName))
- if (!gotten1.exists()) {
- throw new SparkException("file doesn't exist : " + absolutePath1)
- }
- if (!gotten2.exists()) {
- throw new SparkException("file doesn't exist : " + absolutePath2)
- }
+ withTempDir { dir =>
+ val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
+ val absolutePath1 = file1.getAbsolutePath
+
+ val file2 = File.createTempFile("someprefix2", "somesuffix2", dir)
+ val relativePath = file2.getParent + "/../" + file2.getParentFile.getName +
+ "/" + file2.getName
+ val absolutePath2 = file2.getAbsolutePath
+
+ try {
+ Files.write("somewords1", file1, StandardCharsets.UTF_8)
+ Files.write("somewords2", file2, StandardCharsets.UTF_8)
+ val length1 = file1.length()
+ val length2 = file2.length()
+
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ sc.addFile(file1.getAbsolutePath)
+ sc.addFile(relativePath)
+ sc.parallelize(Array(1), 1).map(x => {
+ val gotten1 = new File(SparkFiles.get(file1.getName))
+ val gotten2 = new File(SparkFiles.get(file2.getName))
+ if (!gotten1.exists()) {
+ throw new SparkException("file doesn't exist : " + absolutePath1)
+ }
+ if (!gotten2.exists()) {
+ throw new SparkException("file doesn't exist : " + absolutePath2)
+ }
- if (length1 != gotten1.length()) {
- throw new SparkException(
- s"file has different length $length1 than added file ${gotten1.length()} : " +
- absolutePath1)
- }
- if (length2 != gotten2.length()) {
- throw new SparkException(
- s"file has different length $length2 than added file ${gotten2.length()} : " +
- absolutePath2)
- }
+ if (length1 != gotten1.length()) {
+ throw new SparkException(
+ s"file has different length $length1 than added file ${gotten1.length()} : " +
+ absolutePath1)
+ }
+ if (length2 != gotten2.length()) {
+ throw new SparkException(
+ s"file has different length $length2 than added file ${gotten2.length()} : " +
+ absolutePath2)
+ }
- if (absolutePath1 == gotten1.getAbsolutePath) {
- throw new SparkException("file should have been copied :" + absolutePath1)
- }
- if (absolutePath2 == gotten2.getAbsolutePath) {
- throw new SparkException("file should have been copied : " + absolutePath2)
- }
- x
- }).count()
- assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1)
- } finally {
- sc.stop()
+ if (absolutePath1 == gotten1.getAbsolutePath) {
+ throw new SparkException("file should have been copied :" + absolutePath1)
+ }
+ if (absolutePath2 == gotten2.getAbsolutePath) {
+ throw new SparkException("file should have been copied : " + absolutePath2)
+ }
+ x
+ }).count()
+ assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1)
+ } finally {
+ sc.stop()
+ }
}
}
@@ -202,51 +203,51 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
test("addFile recursive works") {
- val pluto = Utils.createTempDir()
- val neptune = Utils.createTempDir(pluto.getAbsolutePath)
- val saturn = Utils.createTempDir(neptune.getAbsolutePath)
- val alien1 = File.createTempFile("alien", "1", neptune)
- val alien2 = File.createTempFile("alien", "2", saturn)
-
- try {
- sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
- sc.addFile(neptune.getAbsolutePath, true)
- sc.parallelize(Array(1), 1).map(x => {
- val sep = File.separator
- if (!new File(SparkFiles.get(neptune.getName + sep + alien1.getName)).exists()) {
- throw new SparkException("can't access file under root added directory")
- }
- if (!new File(SparkFiles.get(neptune.getName + sep + saturn.getName + sep + alien2.getName))
- .exists()) {
- throw new SparkException("can't access file in nested directory")
- }
- if (new File(SparkFiles.get(pluto.getName + sep + neptune.getName + sep + alien1.getName))
- .exists()) {
- throw new SparkException("file exists that shouldn't")
- }
- x
- }).count()
- } finally {
- sc.stop()
+ withTempDir { pluto =>
+ val neptune = Utils.createTempDir(pluto.getAbsolutePath)
+ val saturn = Utils.createTempDir(neptune.getAbsolutePath)
+ val alien1 = File.createTempFile("alien", "1", neptune)
+ val alien2 = File.createTempFile("alien", "2", saturn)
+
+ try {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ sc.addFile(neptune.getAbsolutePath, true)
+ sc.parallelize(Array(1), 1).map(x => {
+ val sep = File.separator
+ if (!new File(SparkFiles.get(neptune.getName + sep + alien1.getName)).exists()) {
+ throw new SparkException("can't access file under root added directory")
+ }
+ if (!new File(SparkFiles.get(
+ neptune.getName + sep + saturn.getName + sep + alien2.getName)).exists()) {
+ throw new SparkException("can't access file in nested directory")
+ }
+ if (new File(SparkFiles.get(
+ pluto.getName + sep + neptune.getName + sep + alien1.getName)).exists()) {
+ throw new SparkException("file exists that shouldn't")
+ }
+ x
+ }).count()
+ } finally {
+ sc.stop()
+ }
}
}
test("addFile recursive can't add directories by default") {
- val dir = Utils.createTempDir()
-
- try {
- sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
- intercept[SparkException] {
- sc.addFile(dir.getAbsolutePath)
+ withTempDir { dir =>
+ try {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ intercept[SparkException] {
+ sc.addFile(dir.getAbsolutePath)
+ }
+ } finally {
+ sc.stop()
}
- } finally {
- sc.stop()
}
}
test("cannot call addFile with different paths that have the same filename") {
- val dir = Utils.createTempDir()
- try {
+ withTempDir { dir =>
val subdir1 = new File(dir, "subdir1")
val subdir2 = new File(dir, "subdir2")
assert(subdir1.mkdir())
@@ -267,8 +268,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
sc.addFile(file2.getAbsolutePath)
}
assert(getAddedFileContents() === "old")
- } finally {
- Utils.deleteRecursively(dir)
}
}
@@ -296,30 +295,33 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
test("add jar with invalid path") {
- val tmpDir = Utils.createTempDir()
- val tmpJar = File.createTempFile("test", ".jar", tmpDir)
+ withTempDir { tmpDir =>
+ val tmpJar = File.createTempFile("test", ".jar", tmpDir)
- sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
- sc.addJar(tmpJar.getAbsolutePath)
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ sc.addJar(tmpJar.getAbsolutePath)
- // Invalid jar path will only print the error log, will not add to file server.
- sc.addJar("dummy.jar")
- sc.addJar("")
- sc.addJar(tmpDir.getAbsolutePath)
+ // Invalid jar path will only print the error log, will not add to file server.
+ sc.addJar("dummy.jar")
+ sc.addJar("")
+ sc.addJar(tmpDir.getAbsolutePath)
- assert(sc.listJars().size == 1)
- assert(sc.listJars().head.contains(tmpJar.getName))
+ assert(sc.listJars().size == 1)
+ assert(sc.listJars().head.contains(tmpJar.getName))
+ }
}
test("SPARK-22585 addJar argument without scheme is interpreted literally without url decoding") {
- val tmpDir = new File(Utils.createTempDir(), "host%3A443")
- tmpDir.mkdirs()
- val tmpJar = File.createTempFile("t%2F", ".jar", tmpDir)
+ withTempDir { dir =>
+ val tmpDir = new File(dir, "host%3A443")
+ tmpDir.mkdirs()
+ val tmpJar = File.createTempFile("t%2F", ".jar", tmpDir)
- sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
- sc.addJar(tmpJar.getAbsolutePath)
- assert(sc.listJars().size === 1)
+ sc.addJar(tmpJar.getAbsolutePath)
+ assert(sc.listJars().size === 1)
+ }
}
test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
@@ -340,60 +342,61 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
test("Comma separated paths for newAPIHadoopFile/wholeTextFiles/binaryFiles (SPARK-7155)") {
// Regression test for SPARK-7155
// dir1 and dir2 are used for wholeTextFiles and binaryFiles
- val dir1 = Utils.createTempDir()
- val dir2 = Utils.createTempDir()
-
- val dirpath1 = dir1.getAbsolutePath
- val dirpath2 = dir2.getAbsolutePath
-
- // file1 and file2 are placed inside dir1, they are also used for
- // textFile, hadoopFile, and newAPIHadoopFile
- // file3, file4 and file5 are placed inside dir2, they are used for
- // textFile, hadoopFile, and newAPIHadoopFile as well
- val file1 = new File(dir1, "part-00000")
- val file2 = new File(dir1, "part-00001")
- val file3 = new File(dir2, "part-00000")
- val file4 = new File(dir2, "part-00001")
- val file5 = new File(dir2, "part-00002")
-
- val filepath1 = file1.getAbsolutePath
- val filepath2 = file2.getAbsolutePath
- val filepath3 = file3.getAbsolutePath
- val filepath4 = file4.getAbsolutePath
- val filepath5 = file5.getAbsolutePath
-
-
- try {
- // Create 5 text files.
- Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1,
- StandardCharsets.UTF_8)
- Files.write("someline1 in file2\nsomeline2 in file2", file2, StandardCharsets.UTF_8)
- Files.write("someline1 in file3", file3, StandardCharsets.UTF_8)
- Files.write("someline1 in file4\nsomeline2 in file4", file4, StandardCharsets.UTF_8)
- Files.write("someline1 in file2\nsomeline2 in file5", file5, StandardCharsets.UTF_8)
-
- sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
-
- // Test textFile, hadoopFile, and newAPIHadoopFile for file1 and file2
- assert(sc.textFile(filepath1 + "," + filepath2).count() == 5L)
- assert(sc.hadoopFile(filepath1 + "," + filepath2,
- classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
- assert(sc.newAPIHadoopFile(filepath1 + "," + filepath2,
- classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
-
- // Test textFile, hadoopFile, and newAPIHadoopFile for file3, file4, and file5
- assert(sc.textFile(filepath3 + "," + filepath4 + "," + filepath5).count() == 5L)
- assert(sc.hadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
- classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
- assert(sc.newAPIHadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
- classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
-
- // Test wholeTextFiles, and binaryFiles for dir1 and dir2
- assert(sc.wholeTextFiles(dirpath1 + "," + dirpath2).count() == 5L)
- assert(sc.binaryFiles(dirpath1 + "," + dirpath2).count() == 5L)
-
- } finally {
- sc.stop()
+ withTempDir { dir1 =>
+ withTempDir { dir2 =>
+ val dirpath1 = dir1.getAbsolutePath
+ val dirpath2 = dir2.getAbsolutePath
+
+ // file1 and file2 are placed inside dir1, they are also used for
+ // textFile, hadoopFile, and newAPIHadoopFile
+ // file3, file4 and file5 are placed inside dir2, they are used for
+ // textFile, hadoopFile, and newAPIHadoopFile as well
+ val file1 = new File(dir1, "part-00000")
+ val file2 = new File(dir1, "part-00001")
+ val file3 = new File(dir2, "part-00000")
+ val file4 = new File(dir2, "part-00001")
+ val file5 = new File(dir2, "part-00002")
+
+ val filepath1 = file1.getAbsolutePath
+ val filepath2 = file2.getAbsolutePath
+ val filepath3 = file3.getAbsolutePath
+ val filepath4 = file4.getAbsolutePath
+ val filepath5 = file5.getAbsolutePath
+
+
+ try {
+ // Create 5 text files.
+ Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1,
+ StandardCharsets.UTF_8)
+ Files.write("someline1 in file2\nsomeline2 in file2", file2, StandardCharsets.UTF_8)
+ Files.write("someline1 in file3", file3, StandardCharsets.UTF_8)
+ Files.write("someline1 in file4\nsomeline2 in file4", file4, StandardCharsets.UTF_8)
+ Files.write("someline1 in file2\nsomeline2 in file5", file5, StandardCharsets.UTF_8)
+
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+
+ // Test textFile, hadoopFile, and newAPIHadoopFile for file1 and file2
+ assert(sc.textFile(filepath1 + "," + filepath2).count() == 5L)
+ assert(sc.hadoopFile(filepath1 + "," + filepath2,
+ classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
+ assert(sc.newAPIHadoopFile(filepath1 + "," + filepath2,
+ classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
+
+ // Test textFile, hadoopFile, and newAPIHadoopFile for file3, file4, and file5
+ assert(sc.textFile(filepath3 + "," + filepath4 + "," + filepath5).count() == 5L)
+ assert(sc.hadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
+ classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
+ assert(sc.newAPIHadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
+ classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
+
+ // Test wholeTextFiles, and binaryFiles for dir1 and dir2
+ assert(sc.wholeTextFiles(dirpath1 + "," + dirpath2).count() == 5L)
+ assert(sc.binaryFiles(dirpath1 + "," + dirpath2).count() == 5L)
+
+ } finally {
+ sc.stop()
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index 3128902..dad24d7 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -23,7 +23,7 @@ import java.io.File
import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}
import org.apache.spark.internal.Logging
-import org.apache.spark.util.AccumulatorContext
+import org.apache.spark.util.{AccumulatorContext, Utils}
/**
* Base abstract class for all unit tests in Spark for handling common functionality.
@@ -106,4 +106,14 @@ abstract class SparkFunSuite
}
}
+ /**
+ * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
+ * returns.
+ */
+ protected def withTempDir(f: File => Unit): Unit = {
+ val dir = Utils.createTempDir()
+ try f(dir) finally {
+ Utils.deleteRecursively(dir)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
index b38a366..7407a65 100644
--- a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
@@ -31,7 +31,6 @@ import org.apache.spark.util.Utils
// a PythonBroadcast:
class PythonBroadcastSuite extends SparkFunSuite with Matchers with SharedSparkContext {
test("PythonBroadcast can be serialized with Kryo (SPARK-4882)") {
- val tempDir = Utils.createTempDir()
val broadcastedString = "Hello, world!"
def assertBroadcastIsValid(broadcast: PythonBroadcast): Unit = {
val source = Source.fromFile(broadcast.path)
@@ -39,7 +38,7 @@ class PythonBroadcastSuite extends SparkFunSuite with Matchers with SharedSparkC
source.close()
contents should be (broadcastedString)
}
- try {
+ withTempDir { tempDir =>
val broadcastDataFile: File = {
val file = new File(tempDir, "broadcastData")
val printWriter = new PrintWriter(file)
@@ -53,8 +52,6 @@ class PythonBroadcastSuite extends SparkFunSuite with Matchers with SharedSparkC
val deserializedBroadcast =
Utils.clone[PythonBroadcast](broadcast, new KryoSerializer(conf).newInstance())
assertBroadcastIsValid(deserializedBroadcast)
- } finally {
- Utils.deleteRecursively(tempDir)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index c093789..a8973d1 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -494,13 +494,11 @@ class SparkSubmitSuite
}
test("launch simple application with spark-submit with redaction") {
- val testDir = Utils.createTempDir()
- testDir.deleteOnExit()
- val testDirPath = new Path(testDir.getAbsolutePath())
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val fileSystem = Utils.getHadoopFileSystem("/",
SparkHadoopUtil.get.newConfiguration(new SparkConf()))
- try {
+ withTempDir { testDir =>
+ val testDirPath = new Path(testDir.getAbsolutePath())
val args = Seq(
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
@@ -519,8 +517,6 @@ class SparkSubmitSuite
Source.fromInputStream(logData).getLines().foreach { line =>
assert(!line.contains("secret_password"))
}
- } finally {
- Utils.deleteRecursively(testDir)
}
}
@@ -614,108 +610,112 @@ class SparkSubmitSuite
assert(new File(rScriptDir).exists)
// compile a small jar containing a class that will be called from R code.
- val tempDir = Utils.createTempDir()
- val srcDir = new File(tempDir, "sparkrtest")
- srcDir.mkdirs()
- val excSource = new JavaSourceFromString(new File(srcDir, "DummyClass").toURI.getPath,
- """package sparkrtest;
+ withTempDir { tempDir =>
+ val srcDir = new File(tempDir, "sparkrtest")
+ srcDir.mkdirs()
+ val excSource = new JavaSourceFromString(new File(srcDir, "DummyClass").toURI.getPath,
+ """package sparkrtest;
|
|public class DummyClass implements java.io.Serializable {
| public static String helloWorld(String arg) { return "Hello " + arg; }
| public static int addStuff(int arg1, int arg2) { return arg1 + arg2; }
|}
- """.stripMargin)
- val excFile = TestUtils.createCompiledClass("DummyClass", srcDir, excSource, Seq.empty)
- val jarFile = new File(tempDir, "sparkRTestJar-%s.jar".format(System.currentTimeMillis()))
- val jarURL = TestUtils.createJar(Seq(excFile), jarFile, directoryPrefix = Some("sparkrtest"))
+ """.
+ stripMargin)
+ val excFile = TestUtils.createCompiledClass("DummyClass", srcDir, excSource, Seq.empty)
+ val jarFile = new File(tempDir, "sparkRTestJar-%s.jar".format(System.currentTimeMillis()))
+ val jarURL = TestUtils.createJar(Seq(excFile), jarFile, directoryPrefix = Some("sparkrtest"))
- val args = Seq(
- "--name", "testApp",
- "--master", "local",
- "--jars", jarURL.toString,
- "--verbose",
- "--conf", "spark.ui.enabled=false",
- rScriptDir)
- runSparkSubmit(args)
+ val args = Seq(
+ "--name", "testApp",
+ "--master", "local",
+ "--jars", jarURL.toString,
+ "--verbose",
+ "--conf", "spark.ui.enabled=false",
+ rScriptDir)
+ runSparkSubmit(args)
+ }
}
test("resolves command line argument paths correctly") {
- val dir = Utils.createTempDir()
- val archive = Paths.get(dir.toPath.toString, "single.zip")
- Files.createFile(archive)
- val jars = "/jar1,/jar2"
- val files = "local:/file1,file2"
- val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
- val pyFiles = "py-file1,py-file2"
-
- // Test jars and files
- val clArgs = Seq(
- "--master", "local",
- "--class", "org.SomeClass",
- "--jars", jars,
- "--files", files,
- "thejar.jar")
- val appArgs = new SparkSubmitArguments(clArgs)
- val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
- appArgs.jars should be (Utils.resolveURIs(jars))
- appArgs.files should be (Utils.resolveURIs(files))
- conf.get("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar"))
- conf.get("spark.files") should be (Utils.resolveURIs(files))
-
- // Test files and archives (Yarn)
- val clArgs2 = Seq(
- "--master", "yarn",
- "--class", "org.SomeClass",
- "--files", files,
- "--archives", archives,
- "thejar.jar"
- )
- val appArgs2 = new SparkSubmitArguments(clArgs2)
- val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
- appArgs2.files should be (Utils.resolveURIs(files))
- appArgs2.archives should fullyMatch regex ("file:/archive1,file:.*#archive3")
- conf2.get("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
- conf2.get("spark.yarn.dist.archives") should fullyMatch regex
- ("file:/archive1,file:.*#archive3")
-
- // Test python files
- val clArgs3 = Seq(
- "--master", "local",
- "--py-files", pyFiles,
- "--conf", "spark.pyspark.driver.python=python3.4",
- "--conf", "spark.pyspark.python=python3.5",
- "mister.py"
- )
- val appArgs3 = new SparkSubmitArguments(clArgs3)
- val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
- appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
- conf3.get("spark.submit.pyFiles") should be (
- PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
- conf3.get(PYSPARK_DRIVER_PYTHON.key) should be ("python3.4")
- conf3.get(PYSPARK_PYTHON.key) should be ("python3.5")
+ withTempDir { dir =>
+ val archive = Paths.get(dir.toPath.toString, "single.zip")
+ Files.createFile(archive)
+ val jars = "/jar1,/jar2"
+ val files = "local:/file1,file2"
+ val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
+ val pyFiles = "py-file1,py-file2"
+
+ // Test jars and files
+ val clArgs = Seq(
+ "--master", "local",
+ "--class", "org.SomeClass",
+ "--jars", jars,
+ "--files", files,
+ "thejar.jar")
+ val appArgs = new SparkSubmitArguments(clArgs)
+ val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
+ appArgs.jars should be(Utils.resolveURIs(jars))
+ appArgs.files should be(Utils.resolveURIs(files))
+ conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
+ conf.get("spark.files") should be(Utils.resolveURIs(files))
+
+ // Test files and archives (Yarn)
+ val clArgs2 = Seq(
+ "--master", "yarn",
+ "--class", "org.SomeClass",
+ "--files", files,
+ "--archives", archives,
+ "thejar.jar"
+ )
+ val appArgs2 = new SparkSubmitArguments(clArgs2)
+ val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
+ appArgs2.files should be(Utils.resolveURIs(files))
+ appArgs2.archives should fullyMatch regex ("file:/archive1,file:.*#archive3")
+ conf2.get("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
+ conf2.get("spark.yarn.dist.archives") should fullyMatch regex
+ ("file:/archive1,file:.*#archive3")
+
+ // Test python files
+ val clArgs3 = Seq(
+ "--master", "local",
+ "--py-files", pyFiles,
+ "--conf", "spark.pyspark.driver.python=python3.4",
+ "--conf", "spark.pyspark.python=python3.5",
+ "mister.py"
+ )
+ val appArgs3 = new SparkSubmitArguments(clArgs3)
+ val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
+ appArgs3.pyFiles should be(Utils.resolveURIs(pyFiles))
+ conf3.get("spark.submit.pyFiles") should be(
+ PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+ conf3.get(PYSPARK_DRIVER_PYTHON.key) should be("python3.4")
+ conf3.get(PYSPARK_PYTHON.key) should be("python3.5")
+ }
}
test("ambiguous archive mapping results in error message") {
- val dir = Utils.createTempDir()
- val archive1 = Paths.get(dir.toPath.toString, "first.zip")
- val archive2 = Paths.get(dir.toPath.toString, "second.zip")
- Files.createFile(archive1)
- Files.createFile(archive2)
- val jars = "/jar1,/jar2"
- val files = "local:/file1,file2"
- val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
- val pyFiles = "py-file1,py-file2"
-
- // Test files and archives (Yarn)
- val clArgs2 = Seq(
- "--master", "yarn",
- "--class", "org.SomeClass",
- "--files", files,
- "--archives", archives,
- "thejar.jar"
- )
+ withTempDir { dir =>
+ val archive1 = Paths.get(dir.toPath.toString, "first.zip")
+ val archive2 = Paths.get(dir.toPath.toString, "second.zip")
+ Files.createFile(archive1)
+ Files.createFile(archive2)
+ val jars = "/jar1,/jar2"
+ val files = "local:/file1,file2"
+ val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3"
+ val pyFiles = "py-file1,py-file2"
+
+ // Test files and archives (Yarn)
+ val clArgs2 = Seq(
+ "--master", "yarn",
+ "--class", "org.SomeClass",
+ "--files", files,
+ "--archives", archives,
+ "thejar.jar"
+ )
- testPrematureExit(clArgs2.toArray, "resolves ambiguously to multiple files")
+ testPrematureExit(clArgs2.toArray, "resolves ambiguously to multiple files")
+ }
}
test("resolves config paths correctly") {
@@ -724,77 +724,77 @@ class SparkSubmitSuite
val archives = "file:/archive1,archive2" // spark.yarn.dist.archives
val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles
- val tmpDir = Utils.createTempDir()
-
- // Test jars and files
- val f1 = File.createTempFile("test-submit-jars-files", "", tmpDir)
- val writer1 = new PrintWriter(f1)
- writer1.println("spark.jars " + jars)
- writer1.println("spark.files " + files)
- writer1.close()
- val clArgs = Seq(
- "--master", "local",
- "--class", "org.SomeClass",
- "--properties-file", f1.getPath,
- "thejar.jar"
- )
- val appArgs = new SparkSubmitArguments(clArgs)
- val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
- conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
- conf.get("spark.files") should be(Utils.resolveURIs(files))
-
- // Test files and archives (Yarn)
- val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
- val writer2 = new PrintWriter(f2)
- writer2.println("spark.yarn.dist.files " + files)
- writer2.println("spark.yarn.dist.archives " + archives)
- writer2.close()
- val clArgs2 = Seq(
- "--master", "yarn",
- "--class", "org.SomeClass",
- "--properties-file", f2.getPath,
- "thejar.jar"
- )
- val appArgs2 = new SparkSubmitArguments(clArgs2)
- val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
- conf2.get("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
- conf2.get("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))
-
- // Test python files
- val f3 = File.createTempFile("test-submit-python-files", "", tmpDir)
- val writer3 = new PrintWriter(f3)
- writer3.println("spark.submit.pyFiles " + pyFiles)
- writer3.close()
- val clArgs3 = Seq(
- "--master", "local",
- "--properties-file", f3.getPath,
- "mister.py"
- )
- val appArgs3 = new SparkSubmitArguments(clArgs3)
- val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
- conf3.get("spark.submit.pyFiles") should be(
- PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
-
- // Test remote python files
- val hadoopConf = new Configuration()
- updateConfWithFakeS3Fs(hadoopConf)
- val f4 = File.createTempFile("test-submit-remote-python-files", "", tmpDir)
- val pyFile1 = File.createTempFile("file1", ".py", tmpDir)
- val pyFile2 = File.createTempFile("file2", ".py", tmpDir)
- val writer4 = new PrintWriter(f4)
- val remotePyFiles = s"s3a://${pyFile1.getAbsolutePath},s3a://${pyFile2.getAbsolutePath}"
- writer4.println("spark.submit.pyFiles " + remotePyFiles)
- writer4.close()
- val clArgs4 = Seq(
- "--master", "yarn",
- "--deploy-mode", "cluster",
- "--properties-file", f4.getPath,
- "hdfs:///tmp/mister.py"
- )
- val appArgs4 = new SparkSubmitArguments(clArgs4)
- val (_, _, conf4, _) = submit.prepareSubmitEnvironment(appArgs4, conf = Some(hadoopConf))
- // Should not format python path for yarn cluster mode
- conf4.get("spark.submit.pyFiles") should be(Utils.resolveURIs(remotePyFiles))
+ withTempDir { tmpDir =>
+ // Test jars and files
+ val f1 = File.createTempFile("test-submit-jars-files", "", tmpDir)
+ val writer1 = new PrintWriter(f1)
+ writer1.println("spark.jars " + jars)
+ writer1.println("spark.files " + files)
+ writer1.close()
+ val clArgs = Seq(
+ "--master", "local",
+ "--class", "org.SomeClass",
+ "--properties-file", f1.getPath,
+ "thejar.jar"
+ )
+ val appArgs = new SparkSubmitArguments(clArgs)
+ val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
+ conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
+ conf.get("spark.files") should be(Utils.resolveURIs(files))
+
+ // Test files and archives (Yarn)
+ val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
+ val writer2 = new PrintWriter(f2)
+ writer2.println("spark.yarn.dist.files " + files)
+ writer2.println("spark.yarn.dist.archives " + archives)
+ writer2.close()
+ val clArgs2 = Seq(
+ "--master", "yarn",
+ "--class", "org.SomeClass",
+ "--properties-file", f2.getPath,
+ "thejar.jar"
+ )
+ val appArgs2 = new SparkSubmitArguments(clArgs2)
+ val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
+ conf2.get("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
+ conf2.get("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))
+
+ // Test python files
+ val f3 = File.createTempFile("test-submit-python-files", "", tmpDir)
+ val writer3 = new PrintWriter(f3)
+ writer3.println("spark.submit.pyFiles " + pyFiles)
+ writer3.close()
+ val clArgs3 = Seq(
+ "--master", "local",
+ "--properties-file", f3.getPath,
+ "mister.py"
+ )
+ val appArgs3 = new SparkSubmitArguments(clArgs3)
+ val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
+ conf3.get("spark.submit.pyFiles") should be(
+ PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+
+ // Test remote python files
+ val hadoopConf = new Configuration()
+ updateConfWithFakeS3Fs(hadoopConf)
+ val f4 = File.createTempFile("test-submit-remote-python-files", "", tmpDir)
+ val pyFile1 = File.createTempFile("file1", ".py", tmpDir)
+ val pyFile2 = File.createTempFile("file2", ".py", tmpDir)
+ val writer4 = new PrintWriter(f4)
+ val remotePyFiles = s"s3a://${pyFile1.getAbsolutePath},s3a://${pyFile2.getAbsolutePath}"
+ writer4.println("spark.submit.pyFiles " + remotePyFiles)
+ writer4.close()
+ val clArgs4 = Seq(
+ "--master", "yarn",
+ "--deploy-mode", "cluster",
+ "--properties-file", f4.getPath,
+ "hdfs:///tmp/mister.py"
+ )
+ val appArgs4 = new SparkSubmitArguments(clArgs4)
+ val (_, _, conf4, _) = submit.prepareSubmitEnvironment(appArgs4, conf = Some(hadoopConf))
+ // Should not format python path for yarn cluster mode
+ conf4.get("spark.submit.pyFiles") should be(Utils.resolveURIs(remotePyFiles))
+ }
}
test("user classpath first in driver") {
@@ -828,46 +828,50 @@ class SparkSubmitSuite
}
test("support glob path") {
- val tmpJarDir = Utils.createTempDir()
- val jar1 = TestUtils.createJarWithFiles(Map("test.resource" -> "1"), tmpJarDir)
- val jar2 = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpJarDir)
-
- val tmpFileDir = Utils.createTempDir()
- val file1 = File.createTempFile("tmpFile1", "", tmpFileDir)
- val file2 = File.createTempFile("tmpFile2", "", tmpFileDir)
-
- val tmpPyFileDir = Utils.createTempDir()
- val pyFile1 = File.createTempFile("tmpPy1", ".py", tmpPyFileDir)
- val pyFile2 = File.createTempFile("tmpPy2", ".egg", tmpPyFileDir)
-
- val tmpArchiveDir = Utils.createTempDir()
- val archive1 = File.createTempFile("archive1", ".zip", tmpArchiveDir)
- val archive2 = File.createTempFile("archive2", ".zip", tmpArchiveDir)
-
- val tempPyFile = File.createTempFile("tmpApp", ".py")
- tempPyFile.deleteOnExit()
-
- val args = Seq(
- "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
- "--name", "testApp",
- "--master", "yarn",
- "--deploy-mode", "client",
- "--jars", s"${tmpJarDir.getAbsolutePath}/*.jar",
- "--files", s"${tmpFileDir.getAbsolutePath}/tmpFile*",
- "--py-files", s"${tmpPyFileDir.getAbsolutePath}/tmpPy*",
- "--archives", s"${tmpArchiveDir.getAbsolutePath}/*.zip",
- tempPyFile.toURI().toString())
-
- val appArgs = new SparkSubmitArguments(args)
- val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
- conf.get("spark.yarn.dist.jars").split(",").toSet should be
- (Set(jar1.toURI.toString, jar2.toURI.toString))
- conf.get("spark.yarn.dist.files").split(",").toSet should be
- (Set(file1.toURI.toString, file2.toURI.toString))
- conf.get("spark.yarn.dist.pyFiles").split(",").toSet should be
- (Set(pyFile1.getAbsolutePath, pyFile2.getAbsolutePath))
- conf.get("spark.yarn.dist.archives").split(",").toSet should be
- (Set(archive1.toURI.toString, archive2.toURI.toString))
+ withTempDir { tmpJarDir =>
+ withTempDir { tmpFileDir =>
+ withTempDir { tmpPyFileDir =>
+ withTempDir { tmpArchiveDir =>
+ val jar1 = TestUtils.createJarWithFiles(Map("test.resource" -> "1"), tmpJarDir)
+ val jar2 = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpJarDir)
+
+ val file1 = File.createTempFile("tmpFile1", "", tmpFileDir)
+ val file2 = File.createTempFile("tmpFile2", "", tmpFileDir)
+
+ val pyFile1 = File.createTempFile("tmpPy1", ".py", tmpPyFileDir)
+ val pyFile2 = File.createTempFile("tmpPy2", ".egg", tmpPyFileDir)
+
+ val archive1 = File.createTempFile("archive1", ".zip", tmpArchiveDir)
+ val archive2 = File.createTempFile("archive2", ".zip", tmpArchiveDir)
+
+ val tempPyFile = File.createTempFile("tmpApp", ".py")
+ tempPyFile.deleteOnExit()
+
+ val args = Seq(
+ "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
+ "--name", "testApp",
+ "--master", "yarn",
+ "--deploy-mode", "client",
+ "--jars", s"${tmpJarDir.getAbsolutePath}/*.jar",
+ "--files", s"${tmpFileDir.getAbsolutePath}/tmpFile*",
+ "--py-files", s"${tmpPyFileDir.getAbsolutePath}/tmpPy*",
+ "--archives", s"${tmpArchiveDir.getAbsolutePath}/*.zip",
+ tempPyFile.toURI().toString())
+
+ val appArgs = new SparkSubmitArguments(args)
+ val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
+ conf.get("spark.yarn.dist.jars").split(",").toSet should be
+ (Set(jar1.toURI.toString, jar2.toURI.toString))
+ conf.get("spark.yarn.dist.files").split(",").toSet should be
+ (Set(file1.toURI.toString, file2.toURI.toString))
+ conf.get("spark.yarn.dist.pyFiles").split(",").toSet should be
+ (Set(pyFile1.getAbsolutePath, pyFile2.getAbsolutePath))
+ conf.get("spark.yarn.dist.archives").split(",").toSet should be
+ (Set(archive1.toURI.toString, archive2.toURI.toString))
+ }
+ }
+ }
+ }
}
// scalastyle:on println
@@ -985,37 +989,38 @@ class SparkSubmitSuite
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)
- val tmpDir = Utils.createTempDir()
- val file = File.createTempFile("tmpFile", "", tmpDir)
- val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)
- val mainResource = File.createTempFile("tmpPy", ".py", tmpDir)
- val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
- val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}"
+ withTempDir { tmpDir =>
+ val file = File.createTempFile("tmpFile", "", tmpDir)
+ val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)
+ val mainResource = File.createTempFile("tmpPy", ".py", tmpDir)
+ val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
+ val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}"
- val args = Seq(
- "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
- "--name", "testApp",
- "--master", "yarn",
- "--deploy-mode", "client",
- "--jars", tmpJarPath,
- "--files", s"s3a://${file.getAbsolutePath}",
- "--py-files", s"s3a://${pyFile.getAbsolutePath}",
- s"s3a://$mainResource"
+ val args = Seq(
+ "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
+ "--name", "testApp",
+ "--master", "yarn",
+ "--deploy-mode", "client",
+ "--jars", tmpJarPath,
+ "--files", s"s3a://${file.getAbsolutePath}",
+ "--py-files", s"s3a://${pyFile.getAbsolutePath}",
+ s"s3a://$mainResource"
)
- val appArgs = new SparkSubmitArguments(args)
- val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf))
+ val appArgs = new SparkSubmitArguments(args)
+ val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf))
- // All the resources should still be remote paths, so that YARN client will not upload again.
- conf.get("spark.yarn.dist.jars") should be (tmpJarPath)
- conf.get("spark.yarn.dist.files") should be (s"s3a://${file.getAbsolutePath}")
- conf.get("spark.yarn.dist.pyFiles") should be (s"s3a://${pyFile.getAbsolutePath}")
+ // All the resources should still be remote paths, so that YARN client will not upload again.
+ conf.get("spark.yarn.dist.jars") should be(tmpJarPath)
+ conf.get("spark.yarn.dist.files") should be(s"s3a://${file.getAbsolutePath}")
+ conf.get("spark.yarn.dist.pyFiles") should be(s"s3a://${pyFile.getAbsolutePath}")
- // Local repl jars should be a local path.
- conf.get("spark.repl.local.jars") should (startWith("file:"))
+ // Local repl jars should be a local path.
+ conf.get("spark.repl.local.jars") should (startWith("file:"))
- // local py files should not be a URI format.
- conf.get("spark.submit.pyFiles") should (startWith("/"))
+ // local py files should not be a URI format.
+ conf.get("spark.submit.pyFiles") should (startWith("/"))
+ }
}
test("download remote resource if it is not supported by yarn service") {
@@ -1095,18 +1100,13 @@ class SparkSubmitSuite
}
private def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
- val tmpDir = Utils.createTempDir()
-
- val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")
- val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf), StandardCharsets.UTF_8)
- for ((key, value) <- defaults) writer.write(s"$key $value\n")
-
- writer.close()
-
- try {
+ withTempDir { tmpDir =>
+ val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")
+ val writer =
+ new OutputStreamWriter(new FileOutputStream(defaultsConf), StandardCharsets.UTF_8)
+ for ((key, value) <- defaults) writer.write(s"$key $value\n")
+ writer.close()
f(tmpDir.getAbsolutePath)
- } finally {
- Utils.deleteRecursively(tmpDir)
}
}
@@ -1134,39 +1134,40 @@ class SparkSubmitSuite
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)
- val tmpDir = Utils.createTempDir()
- val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)
+ withTempDir { tmpDir =>
+ val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)
- val args = Seq(
- "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
- "--name", "testApp",
- "--master", "yarn",
- "--deploy-mode", "client",
- "--py-files", s"s3a://${pyFile.getAbsolutePath}",
- "spark-internal"
- )
+ val args = Seq(
+ "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
+ "--name", "testApp",
+ "--master", "yarn",
+ "--deploy-mode", "client",
+ "--py-files", s"s3a://${pyFile.getAbsolutePath}",
+ "spark-internal"
+ )
- val appArgs = new SparkSubmitArguments(args)
- val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf))
+ val appArgs = new SparkSubmitArguments(args)
+ val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf))
- conf.get(PY_FILES.key) should be (s"s3a://${pyFile.getAbsolutePath}")
- conf.get("spark.submit.pyFiles") should (startWith("/"))
+ conf.get(PY_FILES.key) should be(s"s3a://${pyFile.getAbsolutePath}")
+ conf.get("spark.submit.pyFiles") should (startWith("/"))
- // Verify "spark.submit.pyFiles"
- val args1 = Seq(
- "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
- "--name", "testApp",
- "--master", "yarn",
- "--deploy-mode", "client",
- "--conf", s"spark.submit.pyFiles=s3a://${pyFile.getAbsolutePath}",
- "spark-internal"
- )
+ // Verify "spark.submit.pyFiles"
+ val args1 = Seq(
+ "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
+ "--name", "testApp",
+ "--master", "yarn",
+ "--deploy-mode", "client",
+ "--conf", s"spark.submit.pyFiles=s3a://${pyFile.getAbsolutePath}",
+ "spark-internal"
+ )
- val appArgs1 = new SparkSubmitArguments(args1)
- val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1, conf = Some(hadoopConf))
+ val appArgs1 = new SparkSubmitArguments(args1)
+ val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1, conf = Some(hadoopConf))
- conf1.get(PY_FILES.key) should be (s"s3a://${pyFile.getAbsolutePath}")
- conf1.get("spark.submit.pyFiles") should (startWith("/"))
+ conf1.get(PY_FILES.key) should be(s"s3a://${pyFile.getAbsolutePath}")
+ conf1.get("spark.submit.pyFiles") should (startWith("/"))
+ }
}
test("handles natural line delimiters in --properties-file and --conf uniformly") {
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 527c654..c1ae27a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -767,53 +767,54 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
test("clean up stale app information") {
- val storeDir = Utils.createTempDir()
- val conf = createTestConf().set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
- val clock = new ManualClock()
- val provider = spy(new FsHistoryProvider(conf, clock))
- val appId = "new1"
-
- // Write logs for two app attempts.
- clock.advance(1)
- val attempt1 = newLogFile(appId, Some("1"), inProgress = false)
- writeFile(attempt1, true, None,
- SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("1")),
- SparkListenerJobStart(0, 1L, Nil, null),
- SparkListenerApplicationEnd(5L)
+ withTempDir { storeDir =>
+ val conf = createTestConf().set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
+ val clock = new ManualClock()
+ val provider = spy(new FsHistoryProvider(conf, clock))
+ val appId = "new1"
+
+ // Write logs for two app attempts.
+ clock.advance(1)
+ val attempt1 = newLogFile(appId, Some("1"), inProgress = false)
+ writeFile(attempt1, true, None,
+ SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("1")),
+ SparkListenerJobStart(0, 1L, Nil, null),
+ SparkListenerApplicationEnd(5L)
)
- val attempt2 = newLogFile(appId, Some("2"), inProgress = false)
- writeFile(attempt2, true, None,
- SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("2")),
- SparkListenerJobStart(0, 1L, Nil, null),
- SparkListenerApplicationEnd(5L)
+ val attempt2 = newLogFile(appId, Some("2"), inProgress = false)
+ writeFile(attempt2, true, None,
+ SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("2")),
+ SparkListenerJobStart(0, 1L, Nil, null),
+ SparkListenerApplicationEnd(5L)
)
- updateAndCheck(provider) { list =>
- assert(list.size === 1)
- assert(list(0).id === appId)
- assert(list(0).attempts.size === 2)
- }
-
- // Load the app's UI.
- val ui = provider.getAppUI(appId, Some("1"))
- assert(ui.isDefined)
-
- // Delete the underlying log file for attempt 1 and rescan. The UI should go away, but since
- // attempt 2 still exists, listing data should be there.
- clock.advance(1)
- attempt1.delete()
- updateAndCheck(provider) { list =>
- assert(list.size === 1)
- assert(list(0).id === appId)
- assert(list(0).attempts.size === 1)
- }
- assert(!ui.get.valid)
- assert(provider.getAppUI(appId, None) === None)
+ updateAndCheck(provider) { list =>
+ assert(list.size === 1)
+ assert(list(0).id === appId)
+ assert(list(0).attempts.size === 2)
+ }
- // Delete the second attempt's log file. Now everything should go away.
- clock.advance(1)
- attempt2.delete()
- updateAndCheck(provider) { list =>
- assert(list.isEmpty)
+ // Load the app's UI.
+ val ui = provider.getAppUI(appId, Some("1"))
+ assert(ui.isDefined)
+
+ // Delete the underlying log file for attempt 1 and rescan. The UI should go away, but since
+ // attempt 2 still exists, listing data should be there.
+ clock.advance(1)
+ attempt1.delete()
+ updateAndCheck(provider) { list =>
+ assert(list.size === 1)
+ assert(list(0).id === appId)
+ assert(list(0).attempts.size === 1)
+ }
+ assert(!ui.get.valid)
+ assert(provider.getAppUI(appId, None) === None)
+
+ // Delete the second attempt's log file. Now everything should go away.
+ clock.advance(1)
+ attempt2.delete()
+ updateAndCheck(provider) { list =>
+ assert(list.isEmpty)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
index 3795482..e89733a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
@@ -41,18 +41,14 @@ class HistoryServerArgumentsSuite extends SparkFunSuite {
}
test("Properties File Arguments Parsing --properties-file") {
- val tmpDir = Utils.createTempDir()
- val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir)
- try {
+ withTempDir { tmpDir =>
+ val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir)
Files.write("spark.test.CustomPropertyA blah\n" +
"spark.test.CustomPropertyB notblah\n", outFile, UTF_8)
val argStrings = Array("--properties-file", outFile.getAbsolutePath)
val hsa = new HistoryServerArguments(conf, argStrings)
assert(conf.get("spark.test.CustomPropertyA") === "blah")
assert(conf.get("spark.test.CustomPropertyB") === "notblah")
- } finally {
- Utils.deleteRecursively(tmpDir)
}
}
-
}
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index 62fe0ea..3027865 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -31,14 +31,11 @@ import org.apache.spark.util.Utils
class PersistenceEngineSuite extends SparkFunSuite {
test("FileSystemPersistenceEngine") {
- val dir = Utils.createTempDir()
- try {
+ withTempDir { dir =>
val conf = new SparkConf()
testPersistenceEngine(conf, serializer =>
new FileSystemPersistenceEngine(dir.getAbsolutePath, serializer)
)
- } finally {
- Utils.deleteRecursively(dir)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
index 817dc08..576ca161 100644
--- a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
@@ -59,9 +59,7 @@ class WholeTextFileInputFormatSuite extends SparkFunSuite with BeforeAndAfterAll
test("for small files minimum split size per node and per rack should be less than or equal to " +
"maximum split size.") {
- var dir : File = null;
- try {
- dir = Utils.createTempDir()
+ withTempDir { dir =>
logInfo(s"Local disk address is ${dir.toString}.")
// Set the minsize per node and rack to be larger than the size of the input file.
@@ -75,8 +73,6 @@ class WholeTextFileInputFormatSuite extends SparkFunSuite with BeforeAndAfterAll
}
// ensure spark job runs successfully without exceptions from the CombineFileInputFormat
assert(sc.wholeTextFiles(dir.toString).count == 3)
- } finally {
- Utils.deleteRecursively(dir)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
index ddf73d6..4755291 100644
--- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -89,52 +89,50 @@ class WholeTextFileRecordReaderSuite extends SparkFunSuite with BeforeAndAfterAl
* 3) Does the contents be the same.
*/
test("Correctness of WholeTextFileRecordReader.") {
- val dir = Utils.createTempDir()
- logInfo(s"Local disk address is ${dir.toString}.")
+ withTempDir { dir =>
+ logInfo(s"Local disk address is ${dir.toString}.")
- WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
- createNativeFile(dir, filename, contents, false)
- }
+ WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
+ createNativeFile(dir, filename, contents, false)
+ }
- val res = sc.wholeTextFiles(dir.toString, 3).collect()
+ val res = sc.wholeTextFiles(dir.toString, 3).collect()
- assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
- "Number of files read out does not fit with the actual value.")
+ assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
+ "Number of files read out does not fit with the actual value.")
- for ((filename, contents) <- res) {
- val shortName = filename.split('/').last
- assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
- s"Missing file name $filename.")
- assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
- s"file $filename contents can not match.")
+ for ((filename, contents) <- res) {
+ val shortName = filename.split('/').last
+ assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
+ s"Missing file name $filename.")
+ assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
+ s"file $filename contents can not match.")
+ }
}
-
- Utils.deleteRecursively(dir)
}
test("Correctness of WholeTextFileRecordReader with GzipCodec.") {
- val dir = Utils.createTempDir()
- logInfo(s"Local disk address is ${dir.toString}.")
+ withTempDir { dir =>
+ logInfo(s"Local disk address is ${dir.toString}.")
- WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
- createNativeFile(dir, filename, contents, true)
- }
+ WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
+ createNativeFile(dir, filename, contents, true)
+ }
- val res = sc.wholeTextFiles(dir.toString, 3).collect()
+ val res = sc.wholeTextFiles(dir.toString, 3).collect()
- assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
- "Number of files read out does not fit with the actual value.")
+ assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
+ "Number of files read out does not fit with the actual value.")
- for ((filename, contents) <- res) {
- val shortName = filename.split('/').last.split('.')(0)
+ for ((filename, contents) <- res) {
+ val shortName = filename.split('/').last.split('.')(0)
- assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
- s"Missing file name $filename.")
- assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
- s"file $filename contents can not match.")
+ assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
+ s"Missing file name $filename.")
+ assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
+ s"file $filename contents can not match.")
+ }
}
-
- Utils.deleteRecursively(dir)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 0ec359d..945b0944 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -470,15 +470,12 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
}
test("zero-partition RDD") {
- val emptyDir = Utils.createTempDir()
- try {
+ withTempDir { emptyDir =>
val file = sc.textFile(emptyDir.getAbsolutePath)
assert(file.partitions.isEmpty)
assert(file.collect().toList === Nil)
// Test that a shuffle on the file works, because this used to be a bug
assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
- } finally {
- Utils.deleteRecursively(emptyDir)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index a799b1c..5cb2b56 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -822,63 +822,66 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
test("file server") {
- val conf = new SparkConf()
- val tempDir = Utils.createTempDir()
- val file = new File(tempDir, "file")
- Files.write(UUID.randomUUID().toString(), file, UTF_8)
- val fileWithSpecialChars = new File(tempDir, "file name")
- Files.write(UUID.randomUUID().toString(), fileWithSpecialChars, UTF_8)
- val empty = new File(tempDir, "empty")
- Files.write("", empty, UTF_8);
- val jar = new File(tempDir, "jar")
- Files.write(UUID.randomUUID().toString(), jar, UTF_8)
-
- val dir1 = new File(tempDir, "dir1")
- assert(dir1.mkdir())
- val subFile1 = new File(dir1, "file1")
- Files.write(UUID.randomUUID().toString(), subFile1, UTF_8)
-
- val dir2 = new File(tempDir, "dir2")
- assert(dir2.mkdir())
- val subFile2 = new File(dir2, "file2")
- Files.write(UUID.randomUUID().toString(), subFile2, UTF_8)
-
- val fileUri = env.fileServer.addFile(file)
- val fileWithSpecialCharsUri = env.fileServer.addFile(fileWithSpecialChars)
- val emptyUri = env.fileServer.addFile(empty)
- val jarUri = env.fileServer.addJar(jar)
- val dir1Uri = env.fileServer.addDirectory("/dir1", dir1)
- val dir2Uri = env.fileServer.addDirectory("/dir2", dir2)
-
- // Try registering directories with invalid names.
- Seq("/files", "/jars").foreach { uri =>
- intercept[IllegalArgumentException] {
- env.fileServer.addDirectory(uri, dir1)
- }
- }
+ withTempDir { tempDir =>
+ withTempDir { destDir =>
+ val conf = new SparkConf()
+
+ val file = new File(tempDir, "file")
+ Files.write(UUID.randomUUID().toString(), file, UTF_8)
+ val fileWithSpecialChars = new File(tempDir, "file name")
+ Files.write(UUID.randomUUID().toString(), fileWithSpecialChars, UTF_8)
+ val empty = new File(tempDir, "empty")
+ Files.write("", empty, UTF_8);
+ val jar = new File(tempDir, "jar")
+ Files.write(UUID.randomUUID().toString(), jar, UTF_8)
+
+ val dir1 = new File(tempDir, "dir1")
+ assert(dir1.mkdir())
+ val subFile1 = new File(dir1, "file1")
+ Files.write(UUID.randomUUID().toString(), subFile1, UTF_8)
+
+ val dir2 = new File(tempDir, "dir2")
+ assert(dir2.mkdir())
+ val subFile2 = new File(dir2, "file2")
+ Files.write(UUID.randomUUID().toString(), subFile2, UTF_8)
+
+ val fileUri = env.fileServer.addFile(file)
+ val fileWithSpecialCharsUri = env.fileServer.addFile(fileWithSpecialChars)
+ val emptyUri = env.fileServer.addFile(empty)
+ val jarUri = env.fileServer.addJar(jar)
+ val dir1Uri = env.fileServer.addDirectory("/dir1", dir1)
+ val dir2Uri = env.fileServer.addDirectory("/dir2", dir2)
+
+ // Try registering directories with invalid names.
+ Seq("/files", "/jars").foreach { uri =>
+ intercept[IllegalArgumentException] {
+ env.fileServer.addDirectory(uri, dir1)
+ }
+ }
- val destDir = Utils.createTempDir()
- val sm = new SecurityManager(conf)
- val hc = SparkHadoopUtil.get.conf
-
- val files = Seq(
- (file, fileUri),
- (fileWithSpecialChars, fileWithSpecialCharsUri),
- (empty, emptyUri),
- (jar, jarUri),
- (subFile1, dir1Uri + "/file1"),
- (subFile2, dir2Uri + "/file2"))
- files.foreach { case (f, uri) =>
- val destFile = new File(destDir, f.getName())
- Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
- assert(Files.equal(f, destFile))
- }
+ val sm = new SecurityManager(conf)
+ val hc = SparkHadoopUtil.get.conf
+
+ val files = Seq(
+ (file, fileUri),
+ (fileWithSpecialChars, fileWithSpecialCharsUri),
+ (empty, emptyUri),
+ (jar, jarUri),
+ (subFile1, dir1Uri + "/file1"),
+ (subFile2, dir2Uri + "/file2"))
+ files.foreach { case (f, uri) =>
+ val destFile = new File(destDir, f.getName())
+ Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
+ assert(Files.equal(f, destFile))
+ }
- // Try to download files that do not exist.
- Seq("files", "jars", "dir1").foreach { root =>
- intercept[Exception] {
- val uri = env.address.toSparkURL + s"/$root/doesNotExist"
- Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
+ // Try to download files that do not exist.
+ Seq("files", "jars", "dir1").foreach { root =>
+ intercept[Exception] {
+ val uri = env.address.toSparkURL + s"/$root/doesNotExist"
+ Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 5f4ffa1..ed6a3d9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -2831,18 +2831,22 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}
test("SPARK-23207: reliable checkpoint can avoid rollback (checkpointed before)") {
- sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
- val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true)
- shuffleMapRdd.checkpoint()
- shuffleMapRdd.doCheckpoint()
- assertResultStageNotRollbacked(shuffleMapRdd)
+ withTempDir { dir =>
+ sc.setCheckpointDir(dir.getCanonicalPath)
+ val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true)
+ shuffleMapRdd.checkpoint()
+ shuffleMapRdd.doCheckpoint()
+ assertResultStageNotRollbacked(shuffleMapRdd)
+ }
}
test("SPARK-23207: reliable checkpoint fail to rollback (checkpointing now)") {
- sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
- val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true)
- shuffleMapRdd.checkpoint()
- assertResultStageFailToRollback(shuffleMapRdd)
+ withTempDir { dir =>
+ sc.setCheckpointDir(dir.getCanonicalPath)
+ val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true)
+ shuffleMapRdd.checkpoint()
+ assertResultStageFailToRollback(shuffleMapRdd)
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
index d6ff5bb..848f702 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
@@ -49,11 +49,8 @@ class OutputCommitCoordinatorIntegrationSuite
test("exception thrown in OutputCommitter.commitTask()") {
// Regression test for SPARK-10381
failAfter(Span(60, Seconds)) {
- val tempDir = Utils.createTempDir()
- try {
+ withTempDir { tempDir =>
sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
- } finally {
- Utils.deleteRecursively(tempDir)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index a7eed4b..467e490 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -369,27 +369,27 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
}
test("SPARK-12222: deserialize RoaringBitmap throw Buffer underflow exception") {
- val dir = Utils.createTempDir()
- val tmpfile = dir.toString + "/RoaringBitmap"
- val outStream = new FileOutputStream(tmpfile)
- val output = new KryoOutput(outStream)
- val bitmap = new RoaringBitmap
- bitmap.add(1)
- bitmap.add(3)
- bitmap.add(5)
- // Ignore Kryo because it doesn't use writeObject
- bitmap.serialize(new KryoOutputObjectOutputBridge(null, output))
- output.flush()
- output.close()
-
- val inStream = new FileInputStream(tmpfile)
- val input = new KryoInput(inStream)
- val ret = new RoaringBitmap
- // Ignore Kryo because it doesn't use readObject
- ret.deserialize(new KryoInputObjectInputBridge(null, input))
- input.close()
- assert(ret == bitmap)
- Utils.deleteRecursively(dir)
+ withTempDir { dir =>
+ val tmpfile = dir.toString + "/RoaringBitmap"
+ val outStream = new FileOutputStream(tmpfile)
+ val output = new KryoOutput(outStream)
+ val bitmap = new RoaringBitmap
+ bitmap.add(1)
+ bitmap.add(3)
+ bitmap.add(5)
+ // Ignore Kryo because it doesn't use writeObject
+ bitmap.serialize(new KryoOutputObjectOutputBridge(null, output))
+ output.flush()
+ output.close()
+
+ val inStream = new FileInputStream(tmpfile)
+ val input = new KryoInput(inStream)
+ val ret = new RoaringBitmap
+ // Ignore Kryo because it doesn't use readObject
+ ret.deserialize(new KryoInputObjectInputBridge(null, input))
+ input.close()
+ assert(ret == bitmap)
+ }
}
test("KryoOutputObjectOutputBridge.writeObject and KryoInputObjectInputBridge.readObject") {
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
index eec961a..959cf58 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
@@ -132,7 +132,6 @@ class DiskStoreSuite extends SparkFunSuite {
}
test("block data encryption") {
- val testDir = Utils.createTempDir()
val testData = new Array[Byte](128 * 1024)
new Random().nextBytes(testData)
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala
index f9e1b79..e48f001 100644
--- a/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/PeriodicRDDCheckpointerSuite.scala
@@ -50,34 +50,34 @@ class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext
}
test("Checkpointing") {
- val tempDir = Utils.createTempDir()
- val path = tempDir.toURI.toString
- val checkpointInterval = 2
- var rddsToCheck = Seq.empty[RDDToCheck]
- sc.setCheckpointDir(path)
- val rdd1 = createRDD(sc)
- val checkpointer = new PeriodicRDDCheckpointer[Double](checkpointInterval, rdd1.sparkContext)
- checkpointer.update(rdd1)
- rdd1.count()
- rddsToCheck = rddsToCheck :+ RDDToCheck(rdd1, 1)
- checkCheckpoint(rddsToCheck, 1, checkpointInterval)
-
- var iteration = 2
- while (iteration < 9) {
- val rdd = createRDD(sc)
- checkpointer.update(rdd)
- rdd.count()
- rddsToCheck = rddsToCheck :+ RDDToCheck(rdd, iteration)
- checkCheckpoint(rddsToCheck, iteration, checkpointInterval)
- iteration += 1
- }
+ withTempDir { tempDir =>
+ val path = tempDir.toURI.toString
+ val checkpointInterval = 2
+ var rddsToCheck = Seq.empty[RDDToCheck]
+ sc.setCheckpointDir(path)
+ val rdd1 = createRDD(sc)
+ val checkpointer =
+ new PeriodicRDDCheckpointer[Double](checkpointInterval, rdd1.sparkContext)
+ checkpointer.update(rdd1)
+ rdd1.count()
+ rddsToCheck = rddsToCheck :+ RDDToCheck(rdd1, 1)
+ checkCheckpoint(rddsToCheck, 1, checkpointInterval)
+
+ var iteration = 2
+ while (iteration < 9) {
+ val rdd = createRDD(sc)
+ checkpointer.update(rdd)
+ rdd.count()
+ rddsToCheck = rddsToCheck :+ RDDToCheck(rdd, iteration)
+ checkCheckpoint(rddsToCheck, iteration, checkpointInterval)
+ iteration += 1
+ }
- checkpointer.deleteAllCheckpoints()
- rddsToCheck.foreach { rdd =>
- confirmCheckpointRemoved(rdd.rdd)
+ checkpointer.deleteAllCheckpoints()
+ rddsToCheck.foreach { rdd =>
+ confirmCheckpointRemoved(rdd.rdd)
+ }
}
-
- Utils.deleteRecursively(tempDir)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org