You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/02/08 08:08:58 UTC

spark git commit: [SPARK-23348][SQL] append data using saveAsTable should adjust the data types

Repository: spark
Updated Branches:
  refs/heads/master 3473fda6d -> 7f5f5fb12


[SPARK-23348][SQL] append data using saveAsTable should adjust the data types

## What changes were proposed in this pull request?

For inserting/appending data to an existing table, Spark should adjust the data types of the input query according to the table schema, or fail fast if it's uncastable.

There are several ways to insert/append data: SQL API, `DataFrameWriter.insertInto`, `DataFrameWriter.saveAsTable`. The first 2 ways create `InsertIntoTable` plan, and the last way creates `CreateTable` plan. However, we only adjust input query data types for `InsertIntoTable`, and users may hit weird errors when appending data using `saveAsTable`. See the JIRA for the error case.

This PR fixes this bug by adjusting data types for `CreateTable` too.

## How was this patch tested?

new test.

Author: Wenchen Fan <we...@databricks.com>

Closes #20527 from cloud-fan/saveAsTable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f5f5fb1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f5f5fb1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f5f5fb1

Branch: refs/heads/master
Commit: 7f5f5fb1296275a38da0adfa05125dd8ebf729ff
Parents: 3473fda
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Feb 8 00:08:54 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Feb 8 00:08:54 2018 -0800

----------------------------------------------------------------------
 .../spark/sql/execution/datasources/rules.scala | 72 +++++++++++---------
 .../spark/sql/execution/command/DDLSuite.scala  | 28 ++++++++
 2 files changed, 69 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7f5f5fb1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 5dbcf4a..5cc21ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -22,7 +22,7 @@ import java.util.Locale
 import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -178,7 +178,8 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
 
       c.copy(
         tableDesc = existingTable,
-        query = Some(newQuery))
+        query = Some(DDLPreprocessingUtils.castAndRenameQueryOutput(
+          newQuery, existingTable.schema.toAttributes, conf)))
 
     // Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
     // config, and do various checks:
@@ -316,7 +317,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
  * table. It also does data type casting and field renaming, to make sure that the columns to be
  * inserted have the correct data type and fields have the correct names.
  */
-case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport {
+case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
   private def preprocess(
       insert: InsertIntoTable,
       tblName: String,
@@ -336,6 +337,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit
           s"including ${staticPartCols.size} partition column(s) having constant value(s).")
     }
 
+    val newQuery = DDLPreprocessingUtils.castAndRenameQueryOutput(
+      insert.query, expectedColumns, conf)
     if (normalizedPartSpec.nonEmpty) {
       if (normalizedPartSpec.size != partColNames.length) {
         throw new AnalysisException(
@@ -346,37 +349,11 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit
            """.stripMargin)
       }
 
-      castAndRenameChildOutput(insert.copy(partition = normalizedPartSpec), expectedColumns)
+      insert.copy(query = newQuery, partition = normalizedPartSpec)
     } else {
       // All partition columns are dynamic because the InsertIntoTable command does
       // not explicitly specify partitioning columns.
-      castAndRenameChildOutput(insert, expectedColumns)
-        .copy(partition = partColNames.map(_ -> None).toMap)
-    }
-  }
-
-  private def castAndRenameChildOutput(
-      insert: InsertIntoTable,
-      expectedOutput: Seq[Attribute]): InsertIntoTable = {
-    val newChildOutput = expectedOutput.zip(insert.query.output).map {
-      case (expected, actual) =>
-        if (expected.dataType.sameType(actual.dataType) &&
-            expected.name == actual.name &&
-            expected.metadata == actual.metadata) {
-          actual
-        } else {
-          // Renaming is needed for handling the following cases like
-          // 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2
-          // 2) Target tables have column metadata
-          Alias(cast(actual, expected.dataType), expected.name)(
-            explicitMetadata = Option(expected.metadata))
-        }
-    }
-
-    if (newChildOutput == insert.query.output) {
-      insert
-    } else {
-      insert.copy(query = Project(newChildOutput, insert.query))
+      insert.copy(query = newQuery, partition = partColNames.map(_ -> None).toMap)
     }
   }
 
@@ -491,3 +468,36 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
     }
   }
 }
+
+object DDLPreprocessingUtils {
+
+  /**
+   * Adjusts the name and data type of the input query output columns, to match the expectation.
+   */
+  def castAndRenameQueryOutput(
+      query: LogicalPlan,
+      expectedOutput: Seq[Attribute],
+      conf: SQLConf): LogicalPlan = {
+    val newChildOutput = expectedOutput.zip(query.output).map {
+      case (expected, actual) =>
+        if (expected.dataType.sameType(actual.dataType) &&
+          expected.name == actual.name &&
+          expected.metadata == actual.metadata) {
+          actual
+        } else {
+          // Renaming is needed for handling the following cases like
+          // 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2
+          // 2) Target tables have column metadata
+          Alias(
+            Cast(actual, expected.dataType, Option(conf.sessionLocalTimeZone)),
+            expected.name)(explicitMetadata = Option(expected.metadata))
+        }
+    }
+
+    if (newChildOutput == query.output) {
+      query
+    } else {
+      Project(newChildOutput, query)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7f5f5fb1/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index ee3674b..f76bfd2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -37,6 +37,8 @@ import org.apache.spark.util.Utils
 
 
 class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with BeforeAndAfterEach {
+  import testImplicits._
+
   override def afterEach(): Unit = {
     try {
       // drop all databases, tables and functions after each test
@@ -132,6 +134,32 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo
       checkAnswer(spark.table("t"), Row(Row("a", 1)) :: Nil)
     }
   }
+
+  // TODO: This test is copied from HiveDDLSuite, unify it later.
+  test("SPARK-23348: append data to data source table with saveAsTable") {
+    withTable("t", "t1") {
+      Seq(1 -> "a").toDF("i", "j").write.saveAsTable("t")
+      checkAnswer(spark.table("t"), Row(1, "a"))
+
+      sql("INSERT INTO t SELECT 2, 'b'")
+      checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
+
+      Seq(3 -> "c").toDF("i", "j").write.mode("append").saveAsTable("t")
+      checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
+
+      Seq("c" -> 3).toDF("i", "j").write.mode("append").saveAsTable("t")
+      checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c")
+        :: Row(null, "3") :: Nil)
+
+      Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1")
+
+      val e = intercept[AnalysisException] {
+        Seq(5 -> "e").toDF("i", "j").write.mode("append").format("json").saveAsTable("t1")
+      }
+      assert(e.message.contains("The format of the existing table default.t1 is " +
+        "`ParquetFileFormat`. It doesn't match the specified format `JsonFileFormat`."))
+    }
+  }
 }
 
 abstract class DDLSuite extends QueryTest with SQLTestUtils {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org