You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/03/31 21:17:38 UTC

spark git commit: [SPARK-14304][SQL][TESTS] Fix tests that don't create temp files in the `java.io.tmpdir` folder

Repository: spark
Updated Branches:
  refs/heads/master 3cfbeb70b -> e78540282


[SPARK-14304][SQL][TESTS] Fix tests that don't create temp files in the `java.io.tmpdir` folder

## What changes were proposed in this pull request?

If I press `CTRL-C` when running these tests, the temp files will be left in `sql/core` folder and I need to delete them manually. It's annoying. This PR just moves the temp files to the `java.io.tmpdir` folder and add a name prefix for them.

## How was this patch tested?

Existing Jenkins tests

Author: Shixiong Zhu <sh...@databricks.com>

Closes #12093 from zsxwing/temp-file.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e7854028
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e7854028
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e7854028

Branch: refs/heads/master
Commit: e785402826dcd984d9312470464714ba6c908a49
Parents: 3cfbeb7
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Thu Mar 31 12:17:25 2016 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Mar 31 12:17:25 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/StreamTest.scala |  2 +-
 .../streaming/ContinuousQueryManagerSuite.scala |  3 ++-
 .../streaming/DataFrameReaderWriterSuite.scala  |  3 ++-
 .../sql/streaming/FileStreamSinkSuite.scala     |  4 +--
 .../sql/streaming/FileStreamSourceSuite.scala   | 26 ++++++++++----------
 .../spark/sql/streaming/FileStressSuite.scala   |  8 +++---
 6 files changed, 24 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e7854028/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
index 4ca7394..b5be7ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
@@ -265,7 +265,7 @@ trait StreamTest extends QueryTest with Timeouts {
     }
 
     val testThread = Thread.currentThread()
-    val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath
+    val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
 
     try {
       startedTest.foreach { action =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e7854028/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
index 54ce98d..29bd3e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
@@ -236,7 +236,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
           @volatile var query: StreamExecution = null
           try {
             val df = ds.toDF
-            val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath
+            val metadataRoot =
+              Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
             query = sqlContext
               .streams
               .startQuery(

http://git-wip-us.apache.org/repos/asf/spark/blob/e7854028/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
index c1bab9b..102473d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
@@ -69,7 +69,8 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
 class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
   import testImplicits._
 
-  private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath
+  private def newMetadataDir =
+    Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
 
   after {
     sqlContext.streams.active.foreach(_.stop())

http://git-wip-us.apache.org/repos/asf/spark/blob/e7854028/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 7f31611..8cf5ded 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -29,8 +29,8 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
     val inputData = MemoryStream[Int]
     val df = inputData.toDF()
 
-    val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
-    val checkpointDir = Utils.createTempDir("stream.checkpoint").getCanonicalPath
+    val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+    val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
 
     val query =
       df.write

http://git-wip-us.apache.org/repos/asf/spark/blob/e7854028/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 89de15a..054f5c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -202,8 +202,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
   }
 
   test("read from text files") {
-    val src = Utils.createTempDir("streaming.src")
-    val tmp = Utils.createTempDir("streaming.tmp")
+    val src = Utils.createTempDir(namePrefix = "streaming.src")
+    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
 
     val textSource = createFileStreamSource("text", src.getCanonicalPath)
     val filtered = textSource.toDF().filter($"value" contains "keep")
@@ -224,8 +224,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
   }
 
   test("read from json files") {
-    val src = Utils.createTempDir("streaming.src")
-    val tmp = Utils.createTempDir("streaming.tmp")
+    val src = Utils.createTempDir(namePrefix = "streaming.src")
+    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
 
     val textSource = createFileStreamSource("json", src.getCanonicalPath, Some(valueSchema))
     val filtered = textSource.toDF().filter($"value" contains "keep")
@@ -258,8 +258,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
   }
 
   test("read from json files with inferring schema") {
-    val src = Utils.createTempDir("streaming.src")
-    val tmp = Utils.createTempDir("streaming.tmp")
+    val src = Utils.createTempDir(namePrefix = "streaming.src")
+    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
 
     // Add a file so that we can infer its schema
     stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
@@ -279,8 +279,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
   }
 
   test("read from parquet files") {
-    val src = Utils.createTempDir("streaming.src")
-    val tmp = Utils.createTempDir("streaming.tmp")
+    val src = Utils.createTempDir(namePrefix = "streaming.src")
+    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
 
     val fileSource = createFileStreamSource("parquet", src.getCanonicalPath, Some(valueSchema))
     val filtered = fileSource.toDF().filter($"value" contains "keep")
@@ -301,7 +301,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
   }
 
   test("file stream source without schema") {
-    val src = Utils.createTempDir("streaming.src")
+    val src = Utils.createTempDir(namePrefix = "streaming.src")
 
     // Only "text" doesn't need a schema
     createFileStreamSource("text", src.getCanonicalPath)
@@ -318,8 +318,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
   }
 
   test("fault tolerance") {
-    val src = Utils.createTempDir("streaming.src")
-    val tmp = Utils.createTempDir("streaming.tmp")
+    val src = Utils.createTempDir(namePrefix = "streaming.src")
+    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
 
     val textSource = createFileStreamSource("text", src.getCanonicalPath)
     val filtered = textSource.toDF().filter($"value" contains "keep")
@@ -346,8 +346,8 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQ
   import testImplicits._
 
   test("file source stress test") {
-    val src = Utils.createTempDir("streaming.src")
-    val tmp = Utils.createTempDir("streaming.tmp")
+    val src = Utils.createTempDir(namePrefix = "streaming.src")
+    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
 
     val textSource = createFileStreamSource("text", src.getCanonicalPath)
     val ds = textSource.toDS[String]().map(_.toInt + 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/e7854028/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
index 5a1bfb3..3916430 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
@@ -43,10 +43,10 @@ class FileStressSuite extends StreamTest with SharedSQLContext {
 
   test("fault tolerance stress test") {
     val numRecords = 10000
-    val inputDir = Utils.createTempDir("stream.input").getCanonicalPath
-    val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath
-    val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
-    val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath
+    val inputDir = Utils.createTempDir(namePrefix = "stream.input").getCanonicalPath
+    val stagingDir = Utils.createTempDir(namePrefix = "stream.staging").getCanonicalPath
+    val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+    val checkpoint = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
 
     @volatile
     var continue = true


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org