You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/02/21 05:26:57 UTC

spark git commit: [SPARK-18922][TESTS] Fix new test failures on Windows due to path and resource not closed

Repository: spark
Updated Branches:
  refs/heads/master 339419145 -> 17b93b5fe


[SPARK-18922][TESTS] Fix new test failures on Windows due to path and resource not closed

## What changes were proposed in this pull request?

This PR proposes to fix new test failures on WIndows as below:

**Before**

```
KafkaRelationSuite:
 - test late binding start offsets *** FAILED *** (7 seconds, 679 milliseconds)
   Cause: java.nio.file.FileSystemException: C:\projects\spark\target\tmp\spark-4c4b0cd1-4cb7-4908-949d-1b0cc8addb50\topic-4-0\00000000000000000000.log -> C:\projects\spark\target\tmp\spark-4c4b0cd1-4cb7-4908-949d-1b0cc8addb50\topic-4-0\00000000000000000000.log.deleted: The process cannot access the file because it is being used by another process.

KafkaSourceSuite:
 - deserialization of initial offset with Spark 2.1.0 *** FAILED *** (3 seconds, 542 milliseconds)
   java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-97ef64fc-ae61-4ce3-ac59-287fd38bd824

 - deserialization of initial offset written by Spark 2.1.0 *** FAILED *** (60 milliseconds)
   java.nio.file.InvalidPathException: Illegal char <:> at index 2: /C:/projects/spark/external/kafka-0-10-sql/target/scala-2.11/test-classes/kafka-source-initial-offset-version-2.1.0.b

HiveDDLSuite:
 - partitioned table should always put partition columns at the end of table schema *** FAILED *** (657 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-f1b83d09-850a-4bba-8e43-a2a28dfaa757;

DDLSuite:
 - create a data source table without schema *** FAILED *** (94 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-a3f3c161-afae-4d6f-9182-e8642f77062b;

 - SET LOCATION for managed table *** FAILED *** (219 milliseconds)
   org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
 Exchange SinglePartit
 +- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#99367L])
    +- *FileScan parquet default.tbl[] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:projectsspark	arget	mpspark-15be2f2f-4ea9-4c47-bfee-1b7b49363033], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

 - insert data to a data source table which has a not existed location should succeed *** FAILED *** (16 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-34987671-e8d1-4624-ba5b-db1012e1246b;

 - insert into a data source table with no existed partition location should succeed *** FAILED *** (16 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-4c6ccfbf-4091-4032-9fbc-3d40c58267d5;

 - read data from a data source table which has a not existed location should succeed *** FAILED *** (0 milliseconds)

 - read data from a data source table with no existed partition location should succeed *** FAILED *** (0 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-6af39e37-abd1-44e8-ac68-e2dfcf67a2f3;

InputOutputMetricsSuite:
 - output metrics on records written *** FAILED *** (0 milliseconds)
   java.lang.IllegalArgumentException: Wrong FS: file://C:\projects\spark\target\tmp\spark-cd69ee77-88f2-4202-bed6-19c0ee05ef55\InputOutputMetricsSuite, expected: file:///

 - output metrics on records written - new Hadoop API *** FAILED *** (16 milliseconds)
   java.lang.IllegalArgumentException: Wrong FS: file://C:\projects\spark\target\tmp\spark-b69e8fcb-047b-4de8-9cdf-5f026efb6762\InputOutputMetricsSuite, expected: file:///
```

**After**

```
KafkaRelationSuite:
 - test late binding start offsets !!! CANCELED !!! (62 milliseconds)

KafkaSourceSuite:
 - deserialization of initial offset with Spark 2.1.0 (5 seconds, 341 milliseconds)
 - deserialization of initial offset written by Spark 2.1.0 (910 milliseconds)

HiveDDLSuite:
 - partitioned table should always put partition columns at the end of table schema (2 seconds)

DDLSuite:
 - create a data source table without schema (828 milliseconds)
 - SET LOCATION for managed table (406 milliseconds)
 - insert data to a data source table which has a not existed location should succeed (406 milliseconds)
 - insert into a data source table with no existed partition location should succeed (453 milliseconds)
 - read data from a data source table which has a not existed location should succeed (94 milliseconds)
 - read data from a data source table with no existed partition location should succeed (265 milliseconds)

InputOutputMetricsSuite:
 - output metrics on records written (172 milliseconds)
 - output metrics on records written - new Hadoop API (297 milliseconds)
```

## How was this patch tested?

Fixed tests in `InputOutputMetricsSuite`, `KafkaRelationSuite`,  `KafkaSourceSuite`, `DDLSuite.scala` and `HiveDDLSuite`.

Manually tested via AppVeyor as below:

`InputOutputMetricsSuite`: https://ci.appveyor.com/project/spark-test/spark/build/633-20170219-windows-test/job/ex8nvwa6tsh7rmto
`KafkaRelationSuite`: https://ci.appveyor.com/project/spark-test/spark/build/633-20170219-windows-test/job/h8dlcowew52y8ncw
`KafkaSourceSuite`: https://ci.appveyor.com/project/spark-test/spark/build/634-20170219-windows-test/job/9ybgjl7yeubxcre4
`DDLSuite`: https://ci.appveyor.com/project/spark-test/spark/build/635-20170219-windows-test
`HiveDDLSuite`: https://ci.appveyor.com/project/spark-test/spark/build/633-20170219-windows-test/job/up6o9n47er087ltb

Author: hyukjinkwon <gu...@gmail.com>

Closes #16999 from HyukjinKwon/windows-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17b93b5f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17b93b5f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17b93b5f

Branch: refs/heads/master
Commit: 17b93b5feb7595e5b881f288a2774acb5a77e0ab
Parents: 3394191
Author: hyukjinkwon <gu...@gmail.com>
Authored: Mon Feb 20 21:26:54 2017 -0800
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Feb 20 21:26:54 2017 -0800

----------------------------------------------------------------------
 .../spark/metrics/InputOutputMetricsSuite.scala |  4 +--
 .../spark/sql/kafka010/KafkaRelationSuite.scala |  4 +++
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 20 ++++++------
 .../spark/sql/execution/command/DDLSuite.scala  | 33 ++++++++++----------
 .../spark/sql/hive/execution/HiveDDLSuite.scala |  2 +-
 5 files changed, 35 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/17b93b5f/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index becf382..5d52218 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -259,7 +259,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
 
   test("output metrics on records written") {
     val file = new File(tmpDir, getClass.getSimpleName)
-    val filePath = "file://" + file.getAbsolutePath
+    val filePath = file.toURI.toURL.toString
 
     val records = runAndReturnRecordsWritten {
       sc.parallelize(1 to numRecords).saveAsTextFile(filePath)
@@ -269,7 +269,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
 
   test("output metrics on records written - new Hadoop API") {
     val file = new File(tmpDir, getClass.getSimpleName)
-    val filePath = "file://" + file.getAbsolutePath
+    val filePath = file.toURI.toURL.toString
 
     val records = runAndReturnRecordsWritten {
       sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString))

http://git-wip-us.apache.org/repos/asf/spark/blob/17b93b5f/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
index 673d60f..68bc3e3 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
 
 class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLContext {
 
@@ -147,6 +148,9 @@ class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLCon
   }
 
   test("test late binding start offsets") {
+    // Kafka fails to remove the logs on Windows. See KAFKA-1194.
+    assume(!Utils.isWindows)
+
     var kafkaUtils: KafkaTestUtils = null
     try {
       /**

http://git-wip-us.apache.org/repos/asf/spark/blob/17b93b5f/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 4f82b13..534fb77 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
 import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+import org.apache.spark.util.Utils
 
 abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
 
@@ -161,11 +162,12 @@ class KafkaSourceSuite extends KafkaSourceTest {
       // Make sure Spark 2.1.0 will throw an exception when reading the new log
       intercept[java.lang.IllegalArgumentException] {
         // Simulate how Spark 2.1.0 reads the log
-        val in = new FileInputStream(metadataPath.getAbsolutePath + "/0")
-        val length = in.read()
-        val bytes = new Array[Byte](length)
-        in.read(bytes)
-        KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
+        Utils.tryWithResource(new FileInputStream(metadataPath.getAbsolutePath + "/0")) { in =>
+          val length = in.read()
+          val bytes = new Array[Byte](length)
+          in.read(bytes)
+          KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
+        }
       }
     }
   }
@@ -181,13 +183,13 @@ class KafkaSourceSuite extends KafkaSourceTest {
         "subscribe" -> topic
       )
 
-      val from = Paths.get(
-        getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").getPath)
+      val from = new File(
+        getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").toURI).toPath
       val to = Paths.get(s"${metadataPath.getAbsolutePath}/0")
       Files.copy(from, to)
 
-      val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
-        "", parameters)
+      val source = provider.createSource(
+        spark.sqlContext, metadataPath.toURI.toString, None, "", parameters)
       val deserializedOffset = source.getOffset.get
       val referenceOffset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L))
       assert(referenceOffset == deserializedOffset)

http://git-wip-us.apache.org/repos/asf/spark/blob/17b93b5f/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index e1a3b24..b44f20e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1520,7 +1520,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         val e = intercept[AnalysisException] { sql("CREATE TABLE tab1 USING json") }.getMessage
         assert(e.contains("Unable to infer schema for JSON. It must be specified manually"))
 
-        sql(s"CREATE TABLE tab2 using json location '${tempDir.getCanonicalPath}'")
+        sql(s"CREATE TABLE tab2 using json location '${tempDir.toURI}'")
         checkAnswer(spark.table("tab2"), Row("a", "b"))
       }
     }
@@ -1814,7 +1814,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         val defaultTablePath = spark.sessionState.catalog
           .getTableMetadata(TableIdentifier("tbl")).storage.locationUri.get
 
-        sql(s"ALTER TABLE tbl SET LOCATION '${dir.getCanonicalPath}'")
+        sql(s"ALTER TABLE tbl SET LOCATION '${dir.toURI}'")
         spark.catalog.refreshTable("tbl")
         // SET LOCATION won't move data from previous table path to new table path.
         assert(spark.table("tbl").count() == 0)
@@ -1836,15 +1836,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   test("insert data to a data source table which has a not existed location should succeed") {
     withTable("t") {
       withTempDir { dir =>
+        val path = dir.toURI.toString.stripSuffix("/")
         spark.sql(
           s"""
              |CREATE TABLE t(a string, b int)
              |USING parquet
-             |OPTIONS(path "$dir")
+             |OPTIONS(path "$path")
            """.stripMargin)
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        val expectedPath = dir.getAbsolutePath.stripSuffix("/")
-        assert(table.location.stripSuffix("/") == expectedPath)
+        assert(table.location == path)
 
         dir.delete
         val tableLocFile = new File(table.location.stripPrefix("file:"))
@@ -1859,8 +1859,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         assert(tableLocFile.exists)
         checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
 
-        val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
-        val newDirFile = new File(newDir)
+        val newDirFile = new File(dir, "x")
+        val newDir = newDirFile.toURI.toString
         spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
         spark.sessionState.catalog.refreshTable(TableIdentifier("t"))
 
@@ -1878,16 +1878,16 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   test("insert into a data source table with no existed partition location should succeed") {
     withTable("t") {
       withTempDir { dir =>
+        val path = dir.toURI.toString.stripSuffix("/")
         spark.sql(
           s"""
              |CREATE TABLE t(a int, b int, c int, d int)
              |USING parquet
              |PARTITIONED BY(a, b)
-             |LOCATION "$dir"
+             |LOCATION "$path"
            """.stripMargin)
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        val expectedPath = dir.getAbsolutePath.stripSuffix("/")
-        assert(table.location.stripSuffix("/") == expectedPath)
+        assert(table.location == path)
 
         spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
         checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
@@ -1906,25 +1906,26 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   test("read data from a data source table which has a not existed location should succeed") {
     withTable("t") {
       withTempDir { dir =>
+        val path = dir.toURI.toString.stripSuffix("/")
         spark.sql(
           s"""
              |CREATE TABLE t(a string, b int)
              |USING parquet
-             |OPTIONS(path "$dir")
+             |OPTIONS(path "$path")
            """.stripMargin)
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        val expectedPath = dir.getAbsolutePath.stripSuffix("/")
-        assert(table.location.stripSuffix("/") == expectedPath)
+        assert(table.location == path)
 
         dir.delete()
         checkAnswer(spark.table("t"), Nil)
 
-        val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
+        val newDirFile = new File(dir, "x")
+        val newDir = newDirFile.toURI.toString
         spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
 
         val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
         assert(table1.location == newDir)
-        assert(!new File(newDir).exists())
+        assert(!newDirFile.exists())
         checkAnswer(spark.table("t"), Nil)
       }
     }
@@ -1938,7 +1939,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
              |CREATE TABLE t(a int, b int, c int, d int)
              |USING parquet
              |PARTITIONED BY(a, b)
-             |LOCATION "$dir"
+             |LOCATION "${dir.toURI}"
            """.stripMargin)
         spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
         checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

http://git-wip-us.apache.org/repos/asf/spark/blob/17b93b5f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index c04b9ee..792ac1e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1570,7 +1570,7 @@ class HiveDDLSuite
         val dataPath = new File(new File(path, "d=1"), "b=1").getCanonicalPath
         Seq(1 -> 1).toDF("a", "c").write.save(dataPath)
 
-        sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.getCanonicalPath}'")
+        sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.toURI}'")
         assert(getTableColumns("t3") == Seq("a", "c", "d", "b"))
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org