You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/09/05 03:28:29 UTC

spark git commit: [SPARK-17393][SQL] Error Handling when CTAS Against the Same Data Source Table Using Overwrite Mode

Repository: spark
Updated Branches:
  refs/heads/master 1b001b520 -> c1e9a6d27


[SPARK-17393][SQL] Error Handling when CTAS Against the Same Data Source Table Using Overwrite Mode

### What changes were proposed in this pull request?
When we trying to read a table and then write to the same table using the `Overwrite` save mode, we got a very confusing error message:
For example,
```Scala
      Seq((1, 2)).toDF("i", "j").write.saveAsTable("tab1")
      table("tab1").write.mode(SaveMode.Overwrite).saveAsTable("tab1")
```

```
Job aborted.
org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp
...
Caused by: org.apache.spark.SparkException: Task failed while writing rows
	at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
	at org.apache.spark.sql.execution.datasources
```

After the PR, we will issue an `AnalysisException`:
```
Cannot overwrite table `tab1` that is also being read from
```
### How was this patch tested?
Added test cases.

Author: gatorsmile <ga...@gmail.com>

Closes #14954 from gatorsmile/ctasQueryAnalyze.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1e9a6d2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1e9a6d2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1e9a6d2

Branch: refs/heads/master
Commit: c1e9a6d274c281ec30e6d022eedfbe3a2988f721
Parents: 1b001b5
Author: gatorsmile <ga...@gmail.com>
Authored: Mon Sep 5 11:28:19 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Sep 5 11:28:19 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/execution/datasources/rules.scala | 45 +++++++----------
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 52 ++++++++++++++++++++
 2 files changed, 71 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c1e9a6d2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index f14c63c..ae77e4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -304,6 +304,25 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
           failAnalysis(s"Database name ${tblIdent.database.get} is not a valid name for " +
             s"metastore. Metastore only accepts table name containing characters, numbers and _.")
         }
+        if (query.isDefined &&
+          mode == SaveMode.Overwrite &&
+          catalog.tableExists(tableDesc.identifier)) {
+          // Need to remove SubQuery operator.
+          EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match {
+            // Only do the check if the table is a data source table
+            // (the relation is a BaseRelation).
+            case l @ LogicalRelation(dest: BaseRelation, _, _) =>
+              // Get all input data source relations of the query.
+              val srcRelations = query.get.collect {
+                case LogicalRelation(src: BaseRelation, _, _) => src
+              }
+              if (srcRelations.contains(dest)) {
+                failAnalysis(
+                  s"Cannot overwrite table ${tableDesc.identifier} that is also being read from")
+              }
+            case _ => // OK
+          }
+        }
 
       case i @ logical.InsertIntoTable(
         l @ LogicalRelation(t: InsertableRelation, _, _),
@@ -357,32 +376,6 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
         // The relation in l is not an InsertableRelation.
         failAnalysis(s"$l does not allow insertion.")
 
-      case CreateTable(tableDesc, mode, Some(query)) =>
-        // When the SaveMode is Overwrite, we need to check if the table is an input table of
-        // the query. If so, we will throw an AnalysisException to let users know it is not allowed.
-        if (mode == SaveMode.Overwrite && catalog.tableExists(tableDesc.identifier)) {
-          // Need to remove SubQuery operator.
-          EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match {
-            // Only do the check if the table is a data source table
-            // (the relation is a BaseRelation).
-            case l @ LogicalRelation(dest: BaseRelation, _, _) =>
-              // Get all input data source relations of the query.
-              val srcRelations = query.collect {
-                case LogicalRelation(src: BaseRelation, _, _) => src
-              }
-              if (srcRelations.contains(dest)) {
-                failAnalysis(
-                  s"Cannot overwrite table ${tableDesc.identifier} that is also being read from.")
-              } else {
-                // OK
-              }
-
-            case _ => // OK
-          }
-        } else {
-          // OK
-        }
-
       case _ => // OK
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c1e9a6d2/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 7a71475..3466733 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1151,6 +1151,58 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
     }
   }
 
+  test("saveAsTable - source and target are the same table") {
+    val tableName = "tab1"
+    withTable(tableName) {
+      Seq((1, 2)).toDF("i", "j").write.saveAsTable(tableName)
+
+      table(tableName).write.mode(SaveMode.Append).saveAsTable(tableName)
+      checkAnswer(table(tableName),
+        Seq(Row(1, 2), Row(1, 2)))
+
+      table(tableName).write.mode(SaveMode.Ignore).saveAsTable(tableName)
+      checkAnswer(table(tableName),
+        Seq(Row(1, 2), Row(1, 2)))
+
+      var e = intercept[AnalysisException] {
+        table(tableName).write.mode(SaveMode.Overwrite).saveAsTable(tableName)
+      }.getMessage
+      assert(e.contains(s"Cannot overwrite table `$tableName` that is also being read from"))
+
+      e = intercept[AnalysisException] {
+        table(tableName).write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName)
+      }.getMessage
+      assert(e.contains(s"Table `$tableName` already exists"))
+    }
+  }
+
+  test("insertInto - source and target are the same table") {
+    val tableName = "tab1"
+    withTable(tableName) {
+      Seq((1, 2)).toDF("i", "j").write.saveAsTable(tableName)
+
+      table(tableName).write.mode(SaveMode.Append).insertInto(tableName)
+      checkAnswer(
+        table(tableName),
+        Seq(Row(1, 2), Row(1, 2)))
+
+      table(tableName).write.mode(SaveMode.Ignore).insertInto(tableName)
+      checkAnswer(
+        table(tableName),
+        Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2)))
+
+      table(tableName).write.mode(SaveMode.ErrorIfExists).insertInto(tableName)
+      checkAnswer(
+        table(tableName),
+        Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2)))
+
+      val e = intercept[AnalysisException] {
+        table(tableName).write.mode(SaveMode.Overwrite).insertInto(tableName)
+      }.getMessage
+      assert(e.contains(s"Cannot overwrite a path that is also being read from"))
+    }
+  }
+
   test("saveAsTable[append]: less columns") {
     withTable("saveAsTable_less_columns") {
       Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_less_columns")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org