You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/06/07 02:31:16 UTC

[spark] branch master updated: [SPARK-42750][SQL] Support Insert By Name statement

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e6adc67d43d [SPARK-42750][SQL] Support Insert By Name statement
e6adc67d43d is described below

commit e6adc67d43d6beccf21013ee00aa274bed13107c
Author: Jia Fan <fa...@qq.com>
AuthorDate: Wed Jun 7 10:30:59 2023 +0800

    [SPARK-42750][SQL] Support Insert By Name statement
    
    ### What changes were proposed in this pull request?
    
    In some use cases, users have incoming dataframes with fixed column names which might differ from the canonical order. Currently there's no way to handle this easily through the INSERT INTO API - the user has to make sure the columns are in the right order as they would when inserting a tuple. We should add an optional BY NAME clause, such that:
    
    `INSERT INTO tgt BY NAME <query>`
    
    takes each column of <query> and inserts it into the column in `tgt` which has the same name according to the configured `resolver` logic.
    
    Some definitions need to be clarified:
    1. `BY NAME` and specified column insertion (`INSERT INTO t1 (a,b)`... ) is a mutually exclusive operation
    2. But it supports to define partition while using `BY NAME`: `INSERT INTO t PARTITION(a=1) BY NAME <query>`
    
    At now don't support `INSERT OVERWRITE BY NAME` (I will supported in follow up)
    
    ### Why are the changes needed?
    Add new feature `INSERT INTO BY NAME`
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add new test.
    
    Closes #40908 from Hisoka-X/SPARK-42750_insert_into_by_name.
    
    Lead-authored-by: Jia Fan <fa...@qq.com>
    Co-authored-by: Hisoka <fa...@qq.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 docs/sql-ref-ansi-compliance.md                    |  3 +-
 .../spark/sql/catalyst/parser/SqlBaseLexer.g4      |  1 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |  4 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  7 ++--
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 19 ++++++----
 .../sql/catalyst/plans/logical/statements.scala    |  7 +++-
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 34 +++++++++++++++++
 .../execution/datasources/DataSourceStrategy.scala |  9 +++--
 .../datasources/FallBackFileSourceV2.scala         |  2 +-
 .../spark/sql/execution/datasources/rules.scala    | 14 ++++---
 .../sql-tests/analyzer-results/explain-aqe.sql.out |  2 +-
 .../sql-tests/analyzer-results/explain.sql.out     |  2 +-
 .../sql-tests/results/ansi/keywords.sql.out        |  1 +
 .../sql-tests/results/explain-aqe.sql.out          |  2 +-
 .../resources/sql-tests/results/explain.sql.out    |  2 +-
 .../resources/sql-tests/results/keywords.sql.out   |  1 +
 .../org/apache/spark/sql/SQLInsertTestSuite.scala  | 43 ++++++++++++++++++++--
 .../execution/command/PlanResolutionSuite.scala    |  4 +-
 .../ThriftServerWithSparkContextSuite.scala        |  2 +-
 .../org/apache/spark/sql/hive/HiveStrategies.scala |  8 ++--
 21 files changed, 129 insertions(+), 40 deletions(-)

diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index 76b5d5aef73..f9c6f5ea6aa 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -350,7 +350,7 @@ By default, both `spark.sql.ansi.enabled` and `spark.sql.ansi.enforceReservedKey
 Below is a list of all the keywords in Spark SQL.
 
 |Keyword|Spark SQL<br/>ANSI Mode|Spark SQL<br/>Default Mode|SQL-2016|
-|-------|----------------------|-------------------------|--------|
+|------|----------------------|-------------------------|--------|
 |ADD|non-reserved|non-reserved|non-reserved|
 |AFTER|non-reserved|non-reserved|non-reserved|
 |ALL|reserved|non-reserved|reserved|
@@ -527,6 +527,7 @@ Below is a list of all the keywords in Spark SQL.
 |MONTH|non-reserved|non-reserved|non-reserved|
 |MONTHS|non-reserved|non-reserved|non-reserved|
 |MSCK|non-reserved|non-reserved|non-reserved|
+|NAME|non-reserved|non-reserved|non-reserved|
 |NAMESPACE|non-reserved|non-reserved|non-reserved|
 |NAMESPACES|non-reserved|non-reserved|non-reserved|
 |NANOSECOND|non-reserved|non-reserved|non-reserved|
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index 6300221b542..ecd5f5912fd 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -264,6 +264,7 @@ MINUTES: 'MINUTES';
 MONTH: 'MONTH';
 MONTHS: 'MONTHS';
 MSCK: 'MSCK';
+NAME: 'NAME';
 NAMESPACE: 'NAMESPACE';
 NAMESPACES: 'NAMESPACES';
 NANOSECOND: 'NANOSECOND';
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 20c8df4f79a..89100f2aeec 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -318,7 +318,7 @@ query
 
 insertInto
     : INSERT OVERWRITE TABLE? identifierReference (partitionSpec (IF NOT EXISTS)?)?  identifierList?         #insertOverwriteTable
-    | INSERT INTO TABLE? identifierReference partitionSpec? (IF NOT EXISTS)? identifierList?                 #insertIntoTable
+    | INSERT INTO TABLE? identifierReference partitionSpec? (IF NOT EXISTS)? ((BY NAME) | identifierList)?   #insertIntoTable
     | INSERT INTO TABLE? identifierReference REPLACE whereClause                                             #insertIntoReplaceWhere
     | INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat?                     #insertOverwriteHiveDir
     | INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir
@@ -1362,6 +1362,7 @@ ansiNonReserved
     | MONTH
     | MONTHS
     | MSCK
+    | NAME
     | NAMESPACE
     | NAMESPACES
     | NANOSECOND
@@ -1683,6 +1684,7 @@ nonReserved
     | MONTH
     | MONTHS
     | MSCK
+    | NAME
     | NAMESPACE
     | NAMESPACES
     | NANOSECOND
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 51b35498526..aa1b9d0e8fd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1078,7 +1078,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
 
     def apply(plan: LogicalPlan)
         : LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) {
-      case i @ InsertIntoStatement(table, _, _, _, _, _) =>
+      case i @ InsertIntoStatement(table, _, _, _, _, _, _) =>
         val relation = table match {
           case u: UnresolvedRelation if !u.isStreaming =>
             resolveRelation(u).getOrElse(u)
@@ -1278,7 +1278,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
   object ResolveInsertInto extends ResolveInsertionBase {
     override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
       AlwaysProcess.fn, ruleId) {
-      case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _) if i.query.resolved =>
+      case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _)
+          if i.query.resolved =>
         // ifPartitionNotExists is append with validation, but validation is not supported
         if (i.ifPartitionNotExists) {
           throw QueryCompilationErrors.unsupportedIfNotExistsError(r.table.name)
@@ -1290,7 +1291,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
         } else {
           None
         }
-        val isByName = projectByName.nonEmpty
+        val isByName = projectByName.nonEmpty || i.byName
 
         val partCols = partitionColumnNames(r.table)
         validatePartitionSpec(partCols, i.partitionSpec)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 9124890d4af..e84023ec3df 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -166,7 +166,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
     // not found first, instead of errors in the input query of the insert command, by doing a
     // top-down traversal.
     plan.foreach {
-      case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _) =>
+      case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _) =>
         u.tableNotFound(u.multipartIdentifier)
 
       // TODO (SPARK-27484): handle streaming write commands when we have them.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 0801cfbda4b..f4170860c24 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -277,10 +277,10 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
 
   /**
    * Parameters used for writing query to a table:
-   *   (table ident, tableColumnList, partitionKeys, ifPartitionNotExists).
+   *   (table ident, tableColumnList, partitionKeys, ifPartitionNotExists, byName).
    */
   type InsertTableParams =
-    (IdentifierReferenceContext, Seq[String], Map[String, Option[String]], Boolean)
+    (IdentifierReferenceContext, Seq[String], Map[String, Option[String]], Boolean, Boolean)
 
   /**
    * Parameters used for writing query to a directory: (isLocal, CatalogStorageFormat, provider).
@@ -291,7 +291,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
    * Add an
    * {{{
    *   INSERT OVERWRITE TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]? [identifierList]
-   *   INSERT INTO [TABLE] tableIdentifier [partitionSpec]  [identifierList]
+   *   INSERT INTO [TABLE] tableIdentifier [partitionSpec] ([BY NAME] | [identifierList])
    *   INSERT INTO [TABLE] tableIdentifier REPLACE whereClause
    *   INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat]
    *   INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS tablePropertyList]
@@ -307,7 +307,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
       //   2. Write commands do not hold the table logical plan as a child, and we need to add
       //      additional resolution code to resolve identifiers inside the write commands.
       case table: InsertIntoTableContext =>
-        val (relationCtx, cols, partition, ifPartitionNotExists) = visitInsertIntoTable(table)
+        val (relationCtx, cols, partition, ifPartitionNotExists, byName)
+        = visitInsertIntoTable(table)
         withIdentClause(relationCtx, ident => {
           InsertIntoStatement(
             createUnresolvedRelation(relationCtx, ident),
@@ -315,10 +316,12 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
             cols,
             query,
             overwrite = false,
-            ifPartitionNotExists)
+            ifPartitionNotExists,
+            byName)
         })
       case table: InsertOverwriteTableContext =>
-        val (relationCtx, cols, partition, ifPartitionNotExists) = visitInsertOverwriteTable(table)
+        val (relationCtx, cols, partition, ifPartitionNotExists, _)
+        = visitInsertOverwriteTable(table)
         withIdentClause(relationCtx, ident => {
           InsertIntoStatement(
             createUnresolvedRelation(relationCtx, ident),
@@ -358,7 +361,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
       operationNotAllowed("INSERT INTO ... IF NOT EXISTS", ctx)
     }
 
-    (ctx.identifierReference, cols, partitionKeys, false)
+    (ctx.identifierReference, cols, partitionKeys, false, ctx.NAME() != null)
   }
 
   /**
@@ -376,7 +379,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
         dynamicPartitionKeys.keys.mkString(", "), ctx)
     }
 
-    (ctx.identifierReference, cols, partitionKeys, ctx.EXISTS() != null)
+    (ctx.identifierReference, cols, partitionKeys, ctx.EXISTS() != null, false)
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 9c639a4bce6..669750ee448 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -165,6 +165,8 @@ case class QualifiedColType(
  *                             would have Map('a' -> Some('1'), 'b' -> None).
  * @param ifPartitionNotExists If true, only write if the partition does not exist.
  *                             Only valid for static partitions.
+ * @param byName               If true, reorder the data columns to match the column names of the
+ *                             target table.
  */
 case class InsertIntoStatement(
     table: LogicalPlan,
@@ -172,12 +174,15 @@ case class InsertIntoStatement(
     userSpecifiedCols: Seq[String],
     query: LogicalPlan,
     overwrite: Boolean,
-    ifPartitionNotExists: Boolean) extends UnaryParsedStatement {
+    ifPartitionNotExists: Boolean,
+    byName: Boolean = false) extends UnaryParsedStatement {
 
   require(overwrite || !ifPartitionNotExists,
     "IF NOT EXISTS is only valid in INSERT OVERWRITE")
   require(partitionSpec.values.forall(_.nonEmpty) || !ifPartitionNotExists,
     "IF NOT EXISTS is only valid with static partitions")
+  require(userSpecifiedCols.isEmpty || !byName,
+    "BY NAME is only valid without specified cols")
 
   override def child: LogicalPlan = query
   override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoStatement =
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 5899f813f14..53635acf0b3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -1673,6 +1673,40 @@ class DDLParserSuite extends AnalysisTest {
         stop = 69))
   }
 
+  test("insert table: by name") {
+    Seq(
+      "INSERT INTO TABLE testcat.ns1.ns2.tbl BY NAME SELECT * FROM source",
+      "INSERT INTO testcat.ns1.ns2.tbl BY NAME SELECT * FROM source"
+    ).foreach { sql =>
+      parseCompare(sql,
+        InsertIntoStatement(
+          UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
+          Map.empty,
+          Nil,
+          Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
+          overwrite = false, ifPartitionNotExists = false, byName = true))
+    }
+  }
+
+  test("insert table: by name unsupported case") {
+    checkError(
+      exception = parseException("INSERT OVERWRITE TABLE t1 BY NAME SELECT * FROM tmp_view"),
+      errorClass = "PARSE_SYNTAX_ERROR",
+      parameters = Map(
+        "error" -> "'BY'",
+        "hint" -> "")
+    )
+
+    checkError(
+      exception = parseException(
+        "INSERT INTO TABLE t1 BY NAME (c1,c2) SELECT * FROM tmp_view"),
+      errorClass = "PARSE_SYNTAX_ERROR",
+      parameters = Map(
+        "error" -> "'c1'",
+        "hint" -> "")
+    )
+  }
+
   test("delete from table: delete all") {
     parseCompare("DELETE FROM testcat.ns1.ns2.tbl",
       DeleteFromTable(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 818dc4eb31c..454cc0b5f56 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -152,7 +152,7 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
       CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output.map(_.name))
 
     case InsertIntoStatement(l @ LogicalRelation(_: InsertableRelation, _, _, _),
-        parts, _, query, overwrite, false) if parts.isEmpty =>
+        parts, _, query, overwrite, false, _) if parts.isEmpty =>
       InsertIntoDataSourceCommand(l, query, overwrite)
 
     case InsertIntoDir(_, storage, provider, query, overwrite)
@@ -164,7 +164,7 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
       InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite)
 
     case i @ InsertIntoStatement(
-        l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, _, query, overwrite, _)
+        l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, _, query, overwrite, _, _)
         if query.resolved =>
       // If the InsertIntoTable command is for a partitioned HadoopFsRelation and
       // the user has specified static partitions, we add a Project operator on top of the query
@@ -275,10 +275,11 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false),
-        _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
+        _, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
       i.copy(table = readDataSourceTable(tableMeta, options))
 
-    case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _, _, _) =>
+    case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false),
+        _, _, _, _, _, _) =>
       i.copy(table = DDLUtils.readHiveTable(tableMeta))
 
     case UnresolvedCatalogRelation(tableMeta, options, false)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
index b5d06db0241..2e1ae9fe3ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, File
 class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case i @ InsertIntoStatement(
-        d @ DataSourceV2Relation(table: FileTable, _, _, _, _), _, _, _, _, _) =>
+        d @ DataSourceV2Relation(table: FileTable, _, _, _, _), _, _, _, _, _, _) =>
       val v1FileFormat = table.fallbackFileFormat.newInstance()
       val relation = HadoopFsRelation(
         table.fileIndex,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index b3fdfc76c7d..0b07ae1d11c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -389,15 +389,16 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
     }
 
     // Create a project if this INSERT has a user-specified column list.
-    val isByName = insert.userSpecifiedCols.nonEmpty
-    val query = if (isByName) {
+    val hasColumnList = insert.userSpecifiedCols.nonEmpty
+    val query = if (hasColumnList) {
       createProjectForByNameQuery(insert)
     } else {
       insert.query
     }
     val newQuery = try {
       TableOutputResolver.resolveOutputColumns(
-        tblName, expectedColumns, query, byName = isByName, conf, supportColDefaultValue = true)
+        tblName, expectedColumns, query, byName = hasColumnList || insert.byName, conf,
+        supportColDefaultValue = true)
     } catch {
       case e: AnalysisException if staticPartCols.nonEmpty &&
           e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH" =>
@@ -425,7 +426,7 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case i @ InsertIntoStatement(table, _, _, query, _, _) if table.resolved && query.resolved =>
+    case i @ InsertIntoStatement(table, _, _, query, _, _, _) if table.resolved && query.resolved =>
       table match {
         case relation: HiveTableRelation =>
           val metadata = relation.tableMeta
@@ -506,7 +507,8 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
 
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
-      case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), partition, _, query, _, _) =>
+      case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), partition,
+          _, query, _, _, _) =>
         // Get all input data source relations of the query.
         val srcRelations = query.collect {
           case LogicalRelation(src, _, _, _) => src
@@ -528,7 +530,7 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
           case _ => failAnalysis(s"$relation does not allow insertion.")
         }
 
-      case InsertIntoStatement(t, _, _, _, _, _)
+      case InsertIntoStatement(t, _, _, _, _, _, _)
         if !t.isInstanceOf[LeafNode] ||
           t.isInstanceOf[Range] ||
           t.isInstanceOf[OneRowRelation] ||
diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out
index 8189f1fc7d1..c53642b8ba2 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out
@@ -196,7 +196,7 @@ ExplainCommand 'Aggregate ['key], ['key, unresolvedalias('MIN('val), None)], For
 -- !query
 EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
 -- !query analysis
-ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false, ExtendedMode
+ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false, false, ExtendedMode
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out
index 8189f1fc7d1..c53642b8ba2 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out
@@ -196,7 +196,7 @@ ExplainCommand 'Aggregate ['key], ['key, unresolvedalias('MIN('val), None)], For
 -- !query
 EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
 -- !query analysis
-ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false, ExtendedMode
+ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false, false, ExtendedMode
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
index 9952c9aef62..34cc6cc4bd9 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
@@ -180,6 +180,7 @@ MINUTES	false
 MONTH	false
 MONTHS	false
 MSCK	false
+NAME	false
 NAMESPACE	false
 NAMESPACES	false
 NANOSECOND	false
diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
index d73035aa527..3c2677c936f 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
@@ -1081,7 +1081,7 @@ EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
 struct<plan:string>
 -- !query output
 == Parsed Logical Plan ==
-'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false
+'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false, false
 +- 'Project [*]
    +- 'UnresolvedRelation [explain_temp4], [], false
 
diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
index 5ac793fed86..f54c6c5e44f 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
@@ -1023,7 +1023,7 @@ EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
 struct<plan:string>
 -- !query output
 == Parsed Logical Plan ==
-'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false
+'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false, false
 +- 'Project [*]
    +- 'UnresolvedRelation [explain_temp4], [], false
 
diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
index aa13b029300..41d491c8027 100644
--- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
@@ -180,6 +180,7 @@ MINUTES	false
 MONTH	false
 MONTHS	false
 MSCK	false
+NAME	false
 NAMESPACE	false
 NAMESPACES	false
 NANOSECOND	false
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
index af85e44519b..1d27904bb2c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
@@ -51,17 +51,20 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
       input: DataFrame,
       cols: Seq[String] = Nil,
       partitionExprs: Seq[String] = Nil,
-      overwrite: Boolean): Unit = {
+      overwrite: Boolean,
+      byName: Boolean = false): Unit = {
     val tmpView = "tmp_view"
-    val columnList = if (cols.nonEmpty) cols.mkString("(", ",", ")") else ""
     val partitionList = if (partitionExprs.nonEmpty) {
       partitionExprs.mkString("PARTITION (", ",", ")")
     } else ""
     withTempView(tmpView) {
       input.createOrReplaceTempView(tmpView)
       val overwriteStr = if (overwrite) "OVERWRITE" else "INTO"
+      val columnList = if (cols.nonEmpty && !byName) cols.mkString("(", ",", ")") else ""
+      val byNameStr = if (byName) "BY NAME" else ""
       sql(
-        s"INSERT $overwriteStr TABLE $tableName $partitionList $columnList SELECT * FROM $tmpView")
+        s"INSERT $overwriteStr TABLE $tableName $partitionList $byNameStr " +
+          s"$columnList SELECT * FROM $tmpView")
     }
   }
 
@@ -123,6 +126,40 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
     }
   }
 
+  test("insert with column list - by name") {
+    withTable("t1") {
+      val cols = Seq("c1", "c2", "c3")
+      val df = Seq((3, 2, 1)).toDF(cols.reverse: _*)
+      createTable("t1", cols, Seq("int", "int", "int"))
+      processInsert("t1", df, overwrite = false, byName = true)
+      verifyTable("t1", df.selectExpr(cols: _*))
+    }
+  }
+
+  test("insert with column list - by name + partitioned table") {
+    val cols = Seq("c1", "c2", "c3", "c4")
+    val df = Seq((4, 3, 2, 1)).toDF(cols.reverse: _*)
+    withTable("t1") {
+      createTable("t1", cols, Seq("int", "int", "int", "int"), cols.takeRight(2))
+      processInsert("t1", df, overwrite = false, byName = true)
+      verifyTable("t1", df.selectExpr(cols: _*))
+    }
+
+    withTable("t1") {
+      createTable("t1", cols, Seq("int", "int", "int", "int"), cols.takeRight(2))
+      processInsert("t1", df.selectExpr("c2", "c1", "c4"),
+        partitionExprs = Seq("c3=3", "c4"), overwrite = false, byName = true)
+      verifyTable("t1", df.selectExpr(cols: _*))
+    }
+
+    withTable("t1") {
+      createTable("t1", cols, Seq("int", "int", "int", "int"), cols.takeRight(2))
+      processInsert("t1", df.selectExpr("c2", "c1"),
+        partitionExprs = Seq("c3=3", "c4=4"), overwrite = false, byName = true)
+      verifyTable("t1", df.selectExpr(cols: _*))
+    }
+  }
+
   test("insert with column list - table output reorder + partitioned table") {
     val cols = Seq("c1", "c2", "c3", "c4")
     val df = Seq((1, 2, 3, 4)).toDF(cols: _*)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 7fa3873fc6e..17a0f308a1a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -1213,7 +1213,7 @@ class PlanResolutionSuite extends AnalysisTest {
       case InsertIntoStatement(
         _, _, _,
         UnresolvedInlineTable(_, Seq(Seq(UnresolvedAttribute(Seq("DEFAULT"))))),
-        _, _) =>
+        _, _, _) =>
 
       case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString)
     }
@@ -1221,7 +1221,7 @@ class PlanResolutionSuite extends AnalysisTest {
       case InsertIntoStatement(
         _, _, _,
         Project(Seq(UnresolvedAttribute(Seq("DEFAULT"))), _),
-        _, _) =>
+        _, _, _) =>
 
       case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString)
     }
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index 4d8b7cdf354..aef9dc69656 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -213,7 +213,7 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
       val sessionHandle = client.openSession(user, "")
       val infoValue = client.getInfo(sessionHandle, GetInfoType.CLI_ODBC_KEYWORDS)
       // scalastyle:off line.size.limit
-      assert(infoValue.getStringValue == "ADD,AFTER,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BETWEEN,BIGINT,BINARY,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPUTE,CONCATENATE,CONSTRAINT,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,DATA,DA [...]
+      assert(infoValue.getStringValue == "ADD,AFTER,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BETWEEN,BIGINT,BINARY,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPUTE,CONCATENATE,CONSTRAINT,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,DATA,DA [...]
       // scalastyle:on line.size.limit
     }
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index b2438d38520..3da3d4a0eb5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -145,7 +145,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
 
     // handles InsertIntoStatement specially as the table in InsertIntoStatement is not added in its
     // children, hence not matched directly by previous HiveTableRelation case.
-    case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _)
+    case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _, _)
       if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
       i.copy(table = hiveTableWithStats(relation))
   }
@@ -160,7 +160,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
 object HiveAnalysis extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case InsertIntoStatement(
-        r: HiveTableRelation, partSpec, _, query, overwrite, ifPartitionNotExists)
+        r: HiveTableRelation, partSpec, _, query, overwrite, ifPartitionNotExists, _)
         if DDLUtils.isHiveTable(r.tableMeta) =>
       InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite,
         ifPartitionNotExists, query.output.map(_.name))
@@ -226,12 +226,12 @@ case class RelationConversions(
     plan resolveOperators {
       // Write path
       case InsertIntoStatement(
-          r: HiveTableRelation, partition, cols, query, overwrite, ifPartitionNotExists)
+          r: HiveTableRelation, partition, cols, query, overwrite, ifPartitionNotExists, byName)
           if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
             (!r.isPartitioned || conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE))
             && isConvertible(r) =>
         InsertIntoStatement(metastoreCatalog.convert(r, isWrite = true), partition, cols,
-          query, overwrite, ifPartitionNotExists)
+          query, overwrite, ifPartitionNotExists, byName)
 
       // Read path
       case relation: HiveTableRelation


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org