You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/02/15 21:21:54 UTC

spark git commit: [SPARK-19329][SQL] Reading from or writing to a datasource table with a non pre-existing location should succeed

Repository: spark
Updated Branches:
  refs/heads/master 59dc26e37 -> 6a9a85b84


[SPARK-19329][SQL] Reading from or writing to a datasource table with a non pre-existing location should succeed

## What changes were proposed in this pull request?

when we insert data into a datasource table use `sqlText`, and the table has an not exists location,
this will throw an Exception.

example:

```
spark.sql("create table t(a string, b int) using parquet")
spark.sql("alter table t set location '/xx'")
spark.sql("insert into table t select 'c', 1")
```

Exception:
```
com.google.common.util.concurrent.UncheckedExecutionException: org.apache.spark.sql.AnalysisException: Path does not exist: /xx;
at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4814)
at com.google.common.cache.LocalCache$LocalLoadingCache.apply(LocalCache.java:4830)
at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:122)
at org.apache.spark.sql.hive.HiveSessionCatalog.lookupRelation(HiveSessionCatalog.scala:69)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:456)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:465)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:463)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:463)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:453)
```

As discussed following comments, we should unify the action when we reading from or writing to a datasource table with a non pre-existing locaiton:

1. reading from a datasource table: return 0 rows
2. writing to a datasource table:  write data successfully

## How was this patch tested?
unit test added

Author: windpiger <so...@outlook.com>

Closes #16672 from windpiger/insertNotExistLocation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a9a85b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a9a85b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a9a85b8

Branch: refs/heads/master
Commit: 6a9a85b84decc2cbe1a0d8791118a0f91a62aa3f
Parents: 59dc26e
Author: windpiger <so...@outlook.com>
Authored: Wed Feb 15 13:21:48 2017 -0800
Committer: Xiao Li <ga...@gmail.com>
Committed: Wed Feb 15 13:21:48 2017 -0800

----------------------------------------------------------------------
 .../datasources/DataSourceStrategy.scala        |   3 +-
 .../spark/sql/execution/command/DDLSuite.scala  | 119 +++++++++++++++++++
 2 files changed, 121 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6a9a85b8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index d8a5158..f429232 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -233,7 +233,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
             // TODO: improve `InMemoryCatalog` and remove this limitation.
             catalogTable = if (withHiveSupport) Some(table) else None)
 
-        LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
+        LogicalRelation(dataSource.resolveRelation(checkFilesExist = false),
+          catalogTable = Some(table))
       }
     })
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6a9a85b8/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 278d247..e1a3b24 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1832,4 +1832,123 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       }
     }
   }
+
+  test("insert data to a data source table which has a not existed location should succeed") {
+    withTable("t") {
+      withTempDir { dir =>
+        spark.sql(
+          s"""
+             |CREATE TABLE t(a string, b int)
+             |USING parquet
+             |OPTIONS(path "$dir")
+           """.stripMargin)
+        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+        val expectedPath = dir.getAbsolutePath.stripSuffix("/")
+        assert(table.location.stripSuffix("/") == expectedPath)
+
+        dir.delete
+        val tableLocFile = new File(table.location.stripPrefix("file:"))
+        assert(!tableLocFile.exists)
+        spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
+        assert(tableLocFile.exists)
+        checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
+
+        Utils.deleteRecursively(dir)
+        assert(!tableLocFile.exists)
+        spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1")
+        assert(tableLocFile.exists)
+        checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
+
+        val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
+        val newDirFile = new File(newDir)
+        spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
+        spark.sessionState.catalog.refreshTable(TableIdentifier("t"))
+
+        val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+        assert(table1.location == newDir)
+        assert(!newDirFile.exists)
+
+        spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
+        assert(newDirFile.exists)
+        checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
+      }
+    }
+  }
+
+  test("insert into a data source table with no existed partition location should succeed") {
+    withTable("t") {
+      withTempDir { dir =>
+        spark.sql(
+          s"""
+             |CREATE TABLE t(a int, b int, c int, d int)
+             |USING parquet
+             |PARTITIONED BY(a, b)
+             |LOCATION "$dir"
+           """.stripMargin)
+        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+        val expectedPath = dir.getAbsolutePath.stripSuffix("/")
+        assert(table.location.stripSuffix("/") == expectedPath)
+
+        spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
+        checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
+
+        val partLoc = new File(s"${dir.getAbsolutePath}/a=1")
+        Utils.deleteRecursively(partLoc)
+        assert(!partLoc.exists())
+        // insert overwrite into a partition which location has been deleted.
+        spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8")
+        assert(partLoc.exists())
+        checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil)
+      }
+    }
+  }
+
+  test("read data from a data source table which has a not existed location should succeed") {
+    withTable("t") {
+      withTempDir { dir =>
+        spark.sql(
+          s"""
+             |CREATE TABLE t(a string, b int)
+             |USING parquet
+             |OPTIONS(path "$dir")
+           """.stripMargin)
+        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+        val expectedPath = dir.getAbsolutePath.stripSuffix("/")
+        assert(table.location.stripSuffix("/") == expectedPath)
+
+        dir.delete()
+        checkAnswer(spark.table("t"), Nil)
+
+        val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
+        spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
+
+        val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+        assert(table1.location == newDir)
+        assert(!new File(newDir).exists())
+        checkAnswer(spark.table("t"), Nil)
+      }
+    }
+  }
+
+  test("read data from a data source table with no existed partition location should succeed") {
+    withTable("t") {
+      withTempDir { dir =>
+        spark.sql(
+          s"""
+             |CREATE TABLE t(a int, b int, c int, d int)
+             |USING parquet
+             |PARTITIONED BY(a, b)
+             |LOCATION "$dir"
+           """.stripMargin)
+        spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
+        checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
+
+        // select from a partition which location has been deleted.
+        Utils.deleteRecursively(dir)
+        assert(!dir.exists())
+        spark.sql("REFRESH TABLE t")
+        checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org