You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/08/10 09:25:02 UTC

spark git commit: [SPARK-15899][SQL] Fix the construction of the file path with hadoop Path

Repository: spark
Updated Branches:
  refs/heads/master b9f8a1170 -> 11a6844be


[SPARK-15899][SQL] Fix the construction of the file path with hadoop Path

## What changes were proposed in this pull request?

Fix the construction of the file path. Previous way of construction caused the creation of incorrect path on Windows.

## How was this patch tested?

Run SQL unit tests on Windows

Author: avulanov <na...@yandex.ru>

Closes #13868 from avulanov/SPARK-15899-file.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11a6844b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11a6844b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11a6844b

Branch: refs/heads/master
Commit: 11a6844bebbad1968bcdc295ab2de31c60dc0874
Parents: b9f8a11
Author: avulanov <na...@yandex.ru>
Authored: Wed Aug 10 10:25:00 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Aug 10 10:25:00 2016 +0100

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  5 +-
 .../spark/sql/execution/command/DDLSuite.scala  | 53 +++++++++++---------
 .../spark/sql/internal/SQLConfSuite.scala       |  4 +-
 3 files changed, 35 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/11a6844b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 2286919..b867a65 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
 import scala.collection.JavaConverters._
 import scala.collection.immutable
 
+import org.apache.hadoop.fs.Path
 import org.apache.parquet.hadoop.ParquetOutputCommitter
 
 import org.apache.spark.internal.Logging
@@ -55,7 +56,7 @@ object SQLConf {
   val WAREHOUSE_PATH = SQLConfigBuilder("spark.sql.warehouse.dir")
     .doc("The default location for managed databases and tables.")
     .stringConf
-    .createWithDefault("file:${system:user.dir}/spark-warehouse")
+    .createWithDefault("${system:user.dir}/spark-warehouse")
 
   val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations")
     .internal()
@@ -679,7 +680,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH)
 
-  def warehousePath: String = getConf(WAREHOUSE_PATH)
+  def warehousePath: String = new Path(getConf(WAREHOUSE_PATH)).toString
 
   override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/11a6844b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 0eb3f20..e14e84e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -111,10 +111,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)
   }
 
-  private def appendTrailingSlash(path: String): String = {
-    if (!path.endsWith(File.separator)) path + File.separator else path
-  }
-
   test("the qualified path of a database is stored in the catalog") {
     val catalog = spark.sessionState.catalog
 
@@ -122,18 +118,19 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       val path = tmpDir.toString
       // The generated temp path is not qualified.
       assert(!path.startsWith("file:/"))
-      sql(s"CREATE DATABASE db1 LOCATION '$path'")
+      val uri = tmpDir.toURI
+      sql(s"CREATE DATABASE db1 LOCATION '$uri'")
       val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri
       assert("file" === pathInCatalog.getScheme)
-      val expectedPath = if (path.endsWith(File.separator)) path.dropRight(1) else path
-      assert(expectedPath === pathInCatalog.getPath)
+      val expectedPath = new Path(path).toUri
+      assert(expectedPath.getPath === pathInCatalog.getPath)
 
       withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) {
         sql(s"CREATE DATABASE db2")
-        val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri
-        assert("file" === pathInCatalog.getScheme)
-        val expectedPath = appendTrailingSlash(spark.sessionState.conf.warehousePath) + "db2.db"
-        assert(expectedPath === pathInCatalog.getPath)
+        val pathInCatalog2 = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri
+        assert("file" === pathInCatalog2.getScheme)
+        val expectedPath2 = new Path(spark.sessionState.conf.warehousePath + "/" + "db2.db").toUri
+        assert(expectedPath2.getPath === pathInCatalog2.getPath)
       }
 
       sql("DROP DATABASE db1")
@@ -141,6 +138,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
+  private def makeQualifiedPath(path: String): String = {
+    // copy-paste from SessionCatalog
+    val hadoopPath = new Path(path)
+    val fs = hadoopPath.getFileSystem(sparkContext.hadoopConfiguration)
+    fs.makeQualified(hadoopPath).toString
+  }
+
   test("Create/Drop Database") {
     withTempDir { tmpDir =>
       val path = tmpDir.toString
@@ -154,8 +158,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
 
             sql(s"CREATE DATABASE $dbName")
             val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
-            val expectedLocation =
-              "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db"
+            val expectedLocation = makeQualifiedPath(path + "/" + s"$dbNameWithoutBackTicks.db")
             assert(db1 == CatalogDatabase(
               dbNameWithoutBackTicks,
               "",
@@ -181,8 +184,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         sql(s"CREATE DATABASE $dbName")
         val db1 = catalog.getDatabaseMetadata(dbName)
         val expectedLocation =
-          "file:" + appendTrailingSlash(System.getProperty("user.dir")) +
-            s"spark-warehouse/$dbName.db"
+          makeQualifiedPath(s"${System.getProperty("user.dir")}/spark-warehouse" +
+            "/" + s"$dbName.db")
         assert(db1 == CatalogDatabase(
           dbName,
           "",
@@ -200,17 +203,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     val catalog = spark.sessionState.catalog
     val databaseNames = Seq("db1", "`database`")
     withTempDir { tmpDir =>
-      val path = tmpDir.toString
-      val dbPath = "file:" + path
+      val path = new Path(tmpDir.toString).toUri.toString
       databaseNames.foreach { dbName =>
         try {
           val dbNameWithoutBackTicks = cleanIdentifier(dbName)
           sql(s"CREATE DATABASE $dbName Location '$path'")
           val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
+          val expPath = makeQualifiedPath(tmpDir.toString)
           assert(db1 == CatalogDatabase(
             dbNameWithoutBackTicks,
             "",
-            if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath,
+            expPath,
             Map.empty))
           sql(s"DROP DATABASE $dbName CASCADE")
           assert(!catalog.databaseExists(dbNameWithoutBackTicks))
@@ -233,8 +236,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
             val dbNameWithoutBackTicks = cleanIdentifier(dbName)
             sql(s"CREATE DATABASE $dbName")
             val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
-            val expectedLocation =
-              "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db"
+            val expectedLocation = makeQualifiedPath(path + "/" + s"$dbNameWithoutBackTicks.db")
             assert(db1 == CatalogDatabase(
               dbNameWithoutBackTicks,
               "",
@@ -263,12 +265,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       val partitionClause =
         userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("")
       val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("")
+      val uri = path.toURI
       sql(
         s"""
            |CREATE TABLE $tabName $schemaClause
            |USING parquet
            |OPTIONS (
-           |  path '$path'
+           |  path '$uri'
            |)
            |$partitionClause
          """.stripMargin)
@@ -404,6 +407,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       val path = dir.getCanonicalPath
       val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("col1", "col2")
       df.write.format("json").save(path)
+      val uri = dir.toURI
 
       withTable(tabName) {
         sql(
@@ -411,7 +415,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
              |CREATE TABLE $tabName
              |USING json
              |OPTIONS (
-             |  path '$path'
+             |  path '$uri'
              |)
            """.stripMargin)
 
@@ -444,6 +448,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         .add("col2", StringType).add("col4", LongType)
         .add("col1", IntegerType).add("col3", IntegerType)
       val partitionCols = Seq("col1", "col3")
+      val uri = dir.toURI
 
       withTable(tabName) {
         spark.sql(
@@ -451,7 +456,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
              |CREATE TABLE $tabName
              |USING json
              |OPTIONS (
-             |  path '$path'
+             |  path '$uri'
              |)
            """.stripMargin)
         val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName))
@@ -511,7 +516,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         databaseNames.foreach { dbName =>
           try {
             val dbNameWithoutBackTicks = cleanIdentifier(dbName)
-            val location = "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db"
+            val location = makeQualifiedPath(path + "/" + s"$dbNameWithoutBackTicks.db")
 
             sql(s"CREATE DATABASE $dbName")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/11a6844b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index 7424e17..3c60b23 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.internal
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.sql.{QueryTest, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.execution.WholeStageCodegenExec
 import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}
@@ -214,7 +216,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
       // to get the default value, always unset it
       spark.conf.unset(SQLConf.WAREHOUSE_PATH.key)
       assert(spark.sessionState.conf.warehousePath
-        === s"file:${System.getProperty("user.dir")}/spark-warehouse")
+        === new Path(s"${System.getProperty("user.dir")}/spark-warehouse").toString)
     } finally {
       sql(s"set ${SQLConf.WAREHOUSE_PATH}=$original")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org