You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dtenedor (via GitHub)" <gi...@apache.org> on 2023/04/04 00:12:16 UTC

[GitHub] [spark] dtenedor opened a new pull request, #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

dtenedor opened a new pull request, #40652:
URL: https://github.com/apache/spark/pull/40652

   ### What changes were proposed in this pull request?
   
   This PR fixes a correctness bug for INSERT commands with timestamp literals. The bug manifests when:
   
   * An INSERT command includes a user-specified column list of fewer columns than the target table.
   * The provided values include timestamp literals.
   
   The bug was that the long integer values stored in the rows to represent these timestamp literals were getting assigned back to UnresolvedInlineTable rows without the timestamp type. Then the analyzer inserted an implicit cast from `LongType` to `TimestampType` later, which incorrectly caused the value to change during execution.
   
   This PR fixes the bug by propagating the timestamp type directly to the output table instead.
   
   ### Why are the changes needed?
   
   This PR fixes a correctness bug.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, this PR fixes a correctness bug.
   
   ### How was this patch tested?
   
   This PR adds a new unit test suite.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157747931


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -271,32 +271,33 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
   /**
    * Updates an inline table to generate missing default column values.
    */
-  private def addMissingDefaultValuesForInsertFromInlineTable(
+  def addMissingDefaultValuesForInsertFromInlineTable(
       node: LogicalPlan,
       insertTableSchemaWithoutPartitionColumns: StructType,
       numUserSpecifiedColumns: Int): LogicalPlan = {
     val schema = insertTableSchemaWithoutPartitionColumns
     val newDefaultExpressions: Seq[Expression] =
       getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
-    val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) {
-      schema.fields.drop(numUserSpecifiedColumns).map(_.name)
-    } else {
-      schema.fields.map(_.name)
-    }
+    val newNames: Seq[String] = schema.fields.map(_.name)
     node match {
       case _ if newDefaultExpressions.isEmpty => node
       case table: UnresolvedInlineTable =>
         table.copy(
-          names = table.names ++ newNames,
+          names = newNames,
           rows = table.rows.map { row => row ++ newDefaultExpressions })
       case local: LocalRelation =>
         // Note that we have consumed a LocalRelation but return an UnresolvedInlineTable, because
         // addMissingDefaultValuesForInsertFromProject must replace unresolved DEFAULT references.
         UnresolvedInlineTable(
-          local.output.map(_.name) ++ newNames,
+          newNames,
           local.data.map { row =>
             val colTypes = StructType(local.output.map(col => StructField(col.name, col.dataType)))

Review Comment:
   Good idea, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157763648


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -271,33 +271,45 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
   /**
    * Updates an inline table to generate missing default column values.
    */
-  private def addMissingDefaultValuesForInsertFromInlineTable(
+  def addMissingDefaultValuesForInsertFromInlineTable(
       node: LogicalPlan,
       insertTableSchemaWithoutPartitionColumns: StructType,
       numUserSpecifiedColumns: Int): LogicalPlan = {
     val schema = insertTableSchemaWithoutPartitionColumns
-    val newDefaultExpressions: Seq[Expression] =
+    val newDefaultExpressions: Seq[UnresolvedAttribute] =
       getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
-    val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) {
-      schema.fields.drop(numUserSpecifiedColumns).map(_.name)
-    } else {
-      schema.fields.map(_.name)
-    }
+    val newNames: Seq[String] = schema.fields.map(_.name)
     node match {
       case _ if newDefaultExpressions.isEmpty => node
       case table: UnresolvedInlineTable =>
         table.copy(
-          names = table.names ++ newNames,
+          names = newNames,
           rows = table.rows.map { row => row ++ newDefaultExpressions })
       case local: LocalRelation =>
         // Note that we have consumed a LocalRelation but return an UnresolvedInlineTable, because
         // addMissingDefaultValuesForInsertFromProject must replace unresolved DEFAULT references.
         UnresolvedInlineTable(
-          local.output.map(_.name) ++ newNames,
+          newNames,
           local.data.map { row =>
             val colTypes = StructType(local.output.map(col => StructField(col.name, col.dataType)))
             row.toSeq(colTypes).map(Literal(_)) ++ newDefaultExpressions
           })
+        /*

Review Comment:
   Shall we remove comments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timestamp literals

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1158755699


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -271,32 +271,37 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
   /**
    * Updates an inline table to generate missing default column values.
    */
-  private def addMissingDefaultValuesForInsertFromInlineTable(
+  def addMissingDefaultValuesForInsertFromInlineTable(
       node: LogicalPlan,
       insertTableSchemaWithoutPartitionColumns: StructType,
       numUserSpecifiedColumns: Int): LogicalPlan = {
     val schema = insertTableSchemaWithoutPartitionColumns
-    val newDefaultExpressions: Seq[Expression] =
+    val newDefaultExpressions: Seq[UnresolvedAttribute] =
       getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
-    val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) {
-      schema.fields.drop(numUserSpecifiedColumns).map(_.name)
-    } else {
-      schema.fields.map(_.name)
-    }
+    val newNames: Seq[String] = schema.fields.map(_.name)
     node match {
       case _ if newDefaultExpressions.isEmpty => node
       case table: UnresolvedInlineTable =>
         table.copy(
-          names = table.names ++ newNames,
+          names = newNames,
           rows = table.rows.map { row => row ++ newDefaultExpressions })
-      case local: LocalRelation =>
-        // Note that we have consumed a LocalRelation but return an UnresolvedInlineTable, because
-        // addMissingDefaultValuesForInsertFromProject must replace unresolved DEFAULT references.
-        UnresolvedInlineTable(
-          local.output.map(_.name) ++ newNames,
-          local.data.map { row =>
-            val colTypes = StructType(local.output.map(col => StructField(col.name, col.dataType)))
-            row.toSeq(colTypes).map(Literal(_)) ++ newDefaultExpressions
+      case local: LocalRelation
+        if (numUserSpecifiedColumns > 0 && node.output.size <= numUserSpecifiedColumns) ||

Review Comment:
   You're right, this was confusing. I replaced this with a simpler way to add this check at the end of the `getNewDefaultExpressionsForInsert` method instead (the last five lines are new):
   
   ```
   val numDefaultExpressionsToAdd =
     getStructFieldsForDefaultExpressions(remainingFields).size
     // Limit the number of new DEFAULT expressions to the difference of the number
     // of columns in the target table and the number of provided values in the source
     // relation. This clamps the total final number of provided values to the number
     // of columns in the target table.
     .min(insertTableSchemaWithoutPartitionColumns.size - numProvidedValues)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timestamp literals

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1223358336


##########
sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis

Review Comment:
   Hi, @dtenedor and @gengliangwang . I have a question.
   
   Although I understand this test suite provide a test coverage for `org.apache.spark.sql.catalyst.analysis.ResolveDefaultColumns`, it doesn't mean this test suite is belong to `org.apache.spark.sql.catalyst.analysis` package. This test suite exists in `sql` module and alone in this directory
   
   ```
   $ tree sql/core/src/test/scala/org/apache/spark/sql/catalyst
   sql/core/src/test/scala/org/apache/spark/sql/catalyst
   └── analysis
       └── ResolveDefaultColumnsSuite.scala
   ```
   
   Is this intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157765220


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -271,32 +271,33 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
   /**
    * Updates an inline table to generate missing default column values.
    */
-  private def addMissingDefaultValuesForInsertFromInlineTable(
+  def addMissingDefaultValuesForInsertFromInlineTable(
       node: LogicalPlan,
       insertTableSchemaWithoutPartitionColumns: StructType,
       numUserSpecifiedColumns: Int): LogicalPlan = {
     val schema = insertTableSchemaWithoutPartitionColumns
     val newDefaultExpressions: Seq[Expression] =
       getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
-    val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) {
-      schema.fields.drop(numUserSpecifiedColumns).map(_.name)
-    } else {
-      schema.fields.map(_.name)
-    }
+    val newNames: Seq[String] = schema.fields.map(_.name)
     node match {
       case _ if newDefaultExpressions.isEmpty => node
       case table: UnresolvedInlineTable =>
         table.copy(
-          names = table.names ++ newNames,
+          names = newNames,
           rows = table.rows.map { row => row ++ newDefaultExpressions })
       case local: LocalRelation =>
         // Note that we have consumed a LocalRelation but return an UnresolvedInlineTable, because
         // addMissingDefaultValuesForInsertFromProject must replace unresolved DEFAULT references.
         UnresolvedInlineTable(
-          local.output.map(_.name) ++ newNames,
+          newNames,
           local.data.map { row =>
             val colTypes = StructType(local.output.map(col => StructField(col.name, col.dataType)))

Review Comment:
   Sorry, I forgot to uncomment the new code after verifying the new fix worked :) updated now.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -271,33 +271,45 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
   /**
    * Updates an inline table to generate missing default column values.
    */
-  private def addMissingDefaultValuesForInsertFromInlineTable(
+  def addMissingDefaultValuesForInsertFromInlineTable(
       node: LogicalPlan,
       insertTableSchemaWithoutPartitionColumns: StructType,
       numUserSpecifiedColumns: Int): LogicalPlan = {
     val schema = insertTableSchemaWithoutPartitionColumns
-    val newDefaultExpressions: Seq[Expression] =
+    val newDefaultExpressions: Seq[UnresolvedAttribute] =
       getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
-    val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) {
-      schema.fields.drop(numUserSpecifiedColumns).map(_.name)
-    } else {
-      schema.fields.map(_.name)
-    }
+    val newNames: Seq[String] = schema.fields.map(_.name)
     node match {
       case _ if newDefaultExpressions.isEmpty => node
       case table: UnresolvedInlineTable =>
         table.copy(
-          names = table.names ++ newNames,
+          names = newNames,
           rows = table.rows.map { row => row ++ newDefaultExpressions })
       case local: LocalRelation =>
         // Note that we have consumed a LocalRelation but return an UnresolvedInlineTable, because
         // addMissingDefaultValuesForInsertFromProject must replace unresolved DEFAULT references.
         UnresolvedInlineTable(
-          local.output.map(_.name) ++ newNames,
+          newNames,
           local.data.map { row =>
             val colTypes = StructType(local.output.map(col => StructField(col.name, col.dataType)))
             row.toSeq(colTypes).map(Literal(_)) ++ newDefaultExpressions
           })
+        /*

Review Comment:
   Sorry, I forgot to uncomment the new code after verifying the new fix worked :) updated now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157749445


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -271,32 +271,33 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
   /**
    * Updates an inline table to generate missing default column values.
    */
-  private def addMissingDefaultValuesForInsertFromInlineTable(
+  def addMissingDefaultValuesForInsertFromInlineTable(
       node: LogicalPlan,
       insertTableSchemaWithoutPartitionColumns: StructType,
       numUserSpecifiedColumns: Int): LogicalPlan = {
     val schema = insertTableSchemaWithoutPartitionColumns
     val newDefaultExpressions: Seq[Expression] =
       getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
-    val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) {
-      schema.fields.drop(numUserSpecifiedColumns).map(_.name)
-    } else {
-      schema.fields.map(_.name)
-    }
+    val newNames: Seq[String] = schema.fields.map(_.name)
     node match {
       case _ if newDefaultExpressions.isEmpty => node
       case table: UnresolvedInlineTable =>
         table.copy(
-          names = table.names ++ newNames,
+          names = newNames,
           rows = table.rows.map { row => row ++ newDefaultExpressions })
       case local: LocalRelation =>
         // Note that we have consumed a LocalRelation but return an UnresolvedInlineTable, because
         // addMissingDefaultValuesForInsertFromProject must replace unresolved DEFAULT references.
         UnresolvedInlineTable(
-          local.output.map(_.name) ++ newNames,
+          newNames,
           local.data.map { row =>
             val colTypes = StructType(local.output.map(col => StructField(col.name, col.dataType)))
-            row.toSeq(colTypes).map(Literal(_)) ++ newDefaultExpressions
+            val values: Seq[Any] = row.toSeq(colTypes)
+            val dataTypes: Seq[DataType] = colTypes.map(_.dataType)
+            val literals: Seq[Literal] = values.zip(dataTypes).map {
+              case (value, dataType) => Literal(value, dataType)

Review Comment:
   I was able to make this work! The challenge was that this method only added new `UnresolvedAttribute("DEFAULT")` before, relying on later methods to replace them. And it's not possible to add those to rows of a LocalRelation. But I updated this to replace the default values themselves instead, and it works. This should be simpler and safer than regenerating back an `UnresolvedInlineTable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157763473


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -271,32 +271,33 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
   /**
    * Updates an inline table to generate missing default column values.
    */
-  private def addMissingDefaultValuesForInsertFromInlineTable(
+  def addMissingDefaultValuesForInsertFromInlineTable(
       node: LogicalPlan,
       insertTableSchemaWithoutPartitionColumns: StructType,
       numUserSpecifiedColumns: Int): LogicalPlan = {
     val schema = insertTableSchemaWithoutPartitionColumns
     val newDefaultExpressions: Seq[Expression] =
       getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
-    val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) {
-      schema.fields.drop(numUserSpecifiedColumns).map(_.name)
-    } else {
-      schema.fields.map(_.name)
-    }
+    val newNames: Seq[String] = schema.fields.map(_.name)
     node match {
       case _ if newDefaultExpressions.isEmpty => node
       case table: UnresolvedInlineTable =>
         table.copy(
-          names = table.names ++ newNames,
+          names = newNames,
           rows = table.rows.map { row => row ++ newDefaultExpressions })
       case local: LocalRelation =>
         // Note that we have consumed a LocalRelation but return an UnresolvedInlineTable, because
         // addMissingDefaultValuesForInsertFromProject must replace unresolved DEFAULT references.
         UnresolvedInlineTable(
-          local.output.map(_.name) ++ newNames,
+          newNames,
           local.data.map { row =>
             val colTypes = StructType(local.output.map(col => StructField(col.name, col.dataType)))

Review Comment:
   This one is not updated yet



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157637643


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -271,32 +271,33 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
   /**
    * Updates an inline table to generate missing default column values.
    */
-  private def addMissingDefaultValuesForInsertFromInlineTable(
+  def addMissingDefaultValuesForInsertFromInlineTable(
       node: LogicalPlan,
       insertTableSchemaWithoutPartitionColumns: StructType,
       numUserSpecifiedColumns: Int): LogicalPlan = {
     val schema = insertTableSchemaWithoutPartitionColumns
     val newDefaultExpressions: Seq[Expression] =
       getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
-    val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) {
-      schema.fields.drop(numUserSpecifiedColumns).map(_.name)
-    } else {
-      schema.fields.map(_.name)
-    }
+    val newNames: Seq[String] = schema.fields.map(_.name)
     node match {
       case _ if newDefaultExpressions.isEmpty => node
       case table: UnresolvedInlineTable =>
         table.copy(
-          names = table.names ++ newNames,
+          names = newNames,
           rows = table.rows.map { row => row ++ newDefaultExpressions })
       case local: LocalRelation =>
         // Note that we have consumed a LocalRelation but return an UnresolvedInlineTable, because
         // addMissingDefaultValuesForInsertFromProject must replace unresolved DEFAULT references.
         UnresolvedInlineTable(
-          local.output.map(_.name) ++ newNames,
+          newNames,
           local.data.map { row =>
             val colTypes = StructType(local.output.map(col => StructField(col.name, col.dataType)))
-            row.toSeq(colTypes).map(Literal(_)) ++ newDefaultExpressions
+            val values: Seq[Any] = row.toSeq(colTypes)
+            val dataTypes: Seq[DataType] = colTypes.map(_.dataType)
+            val literals: Seq[Literal] = values.zip(dataTypes).map {
+              case (value, dataType) => Literal(value, dataType)

Review Comment:
   I wonder if it would be simpler to evaluate the default values into a new Row `defaultValueRow`, and then create a new `LocalRelation` with  all its rows as `JoinedRow(dataRow, defaultValueRow)`.
   The current approach is converting `LocalRelation` back to `UnresolvedInlineTable`...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157777170


##########
sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StructField, StructType, TimestampType}
+
+class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
+  val rule = ResolveDefaultColumns(catalog = null)
+
+  test("SPARK-43018: Assign correct types for DEFAULTs with INSERT from a VALUES list") {
+    // This is the internal storage for the timestamp 2020-12-31 00:00:00.0.
+    val literal = Literal(1609401600000000L, TimestampType)
+    val table = UnresolvedInlineTable(
+      names = Seq("attr1"),
+      rows = Seq(Seq(literal)))
+    val node = ResolveInlineTables(table).asInstanceOf[LocalRelation]
+
+    assert(node.output.map(_.dataType) == Seq(TimestampType))
+    assert(node.data.size == 1)
+
+    val insertTableSchemaWithoutPartitionColumns = StructType(Seq(
+      StructField("c1", TimestampType),
+      StructField("c2", TimestampType)))
+    val result = rule.addMissingDefaultValuesForInsertFromInlineTable(
+      node, insertTableSchemaWithoutPartitionColumns, numUserSpecifiedColumns = 1)
+    val relation: LocalRelation = result match {
+      case r: LocalRelation => r
+      case _ => fail(s"invalid result operator type: $result")
+    }
+    assert(relation.output.map(_.name) == Seq("c1", "c2"))
+    val data: Seq[Seq[Any]] = relation.data.map { row =>
+      row.toSeq(StructType(relation.output.map(col => StructField(col.name, col.dataType))))
+    }
+    assert(data == Seq(Seq(literal.value, null)))
+  }
+
+  test("SPARK-43018: INSERT timestamp values into a table with column DEFAULTs") {
+    withTable("t") {
+      sql("create table t(id int, ts timestamp) using parquet")
+      sql("insert into t (ts) values (timestamp'2023-01-01')")
+      val rows = sql("select id, ts from t").collect()
+      assert(rows.size == 1)
+      assert(rows.head.toString == "[42,2023-01-01 00:00:00.0]")

Review Comment:
   We can use `checkAnswer`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157616471


##########
sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala:
##########


Review Comment:
   I meant
   ```
   sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timestamp literals

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1159046300


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -271,32 +271,34 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
   /**
    * Updates an inline table to generate missing default column values.
    */
-  private def addMissingDefaultValuesForInsertFromInlineTable(
+  def addMissingDefaultValuesForInsertFromInlineTable(
       node: LogicalPlan,
       insertTableSchemaWithoutPartitionColumns: StructType,
       numUserSpecifiedColumns: Int): LogicalPlan = {
     val schema = insertTableSchemaWithoutPartitionColumns
-    val newDefaultExpressions: Seq[Expression] =
-      getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
-    val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) {
-      schema.fields.drop(numUserSpecifiedColumns).map(_.name)
-    } else {
-      schema.fields.map(_.name)
-    }
+    val newDefaultExpressions: Seq[UnresolvedAttribute] =
+      getNewDefaultExpressionsForInsert(schema, numUserSpecifiedColumns, node.output.size)
+    val newNames: Seq[String] = schema.fields.map(_.name)
     node match {
       case _ if newDefaultExpressions.isEmpty => node
       case table: UnresolvedInlineTable =>
         table.copy(
-          names = table.names ++ newNames,
+          names = newNames,
           rows = table.rows.map { row => row ++ newDefaultExpressions })
       case local: LocalRelation =>
-        // Note that we have consumed a LocalRelation but return an UnresolvedInlineTable, because
-        // addMissingDefaultValuesForInsertFromProject must replace unresolved DEFAULT references.
-        UnresolvedInlineTable(
-          local.output.map(_.name) ++ newNames,
-          local.data.map { row =>
-            val colTypes = StructType(local.output.map(col => StructField(col.name, col.dataType)))
-            row.toSeq(colTypes).map(Literal(_)) ++ newDefaultExpressions
+        val newDefaultExpressionsRow = new GenericInternalRow(
+          schema.fields.drop(local.output.size).map {

Review Comment:
   Good question: researching this, I find this code path only runs when there is a user-specified column list of fewer column than the target table; otherwise, the above 'newDefaultExpressions' is empty and we match the first case in this list instead. So that should never happen. I added this information to a comment here, and added a unit test case to cover it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #40652:
URL: https://github.com/apache/spark/pull/40652#issuecomment-1496340472

   Hi @gengliangwang here is the correctness bug fix 🙏 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157603360


##########
sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.analysis.{ResolveDefaultColumns, ResolveInlineTables, UnresolvedAttribute, UnresolvedInlineTable}
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StructField, StructType, TimestampType}
+
+class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
+  val rule = ResolveDefaultColumns(catalog = null)
+  def lit(v: Any): Literal = Literal(v)

Review Comment:
   this is not used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timestamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on PR #40652:
URL: https://github.com/apache/spark/pull/40652#issuecomment-1498470838

   Thanks, merging to master/branch-3.4
   cc @xinrong-meng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timestamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1158886313


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -271,32 +271,34 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
   /**
    * Updates an inline table to generate missing default column values.
    */
-  private def addMissingDefaultValuesForInsertFromInlineTable(
+  def addMissingDefaultValuesForInsertFromInlineTable(
       node: LogicalPlan,
       insertTableSchemaWithoutPartitionColumns: StructType,
       numUserSpecifiedColumns: Int): LogicalPlan = {
     val schema = insertTableSchemaWithoutPartitionColumns
-    val newDefaultExpressions: Seq[Expression] =
-      getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
-    val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) {
-      schema.fields.drop(numUserSpecifiedColumns).map(_.name)
-    } else {
-      schema.fields.map(_.name)
-    }
+    val newDefaultExpressions: Seq[UnresolvedAttribute] =
+      getNewDefaultExpressionsForInsert(schema, numUserSpecifiedColumns, node.output.size)
+    val newNames: Seq[String] = schema.fields.map(_.name)
     node match {
       case _ if newDefaultExpressions.isEmpty => node
       case table: UnresolvedInlineTable =>
         table.copy(
-          names = table.names ++ newNames,
+          names = newNames,
           rows = table.rows.map { row => row ++ newDefaultExpressions })
       case local: LocalRelation =>
-        // Note that we have consumed a LocalRelation but return an UnresolvedInlineTable, because
-        // addMissingDefaultValuesForInsertFromProject must replace unresolved DEFAULT references.
-        UnresolvedInlineTable(
-          local.output.map(_.name) ++ newNames,
-          local.data.map { row =>
-            val colTypes = StructType(local.output.map(col => StructField(col.name, col.dataType)))
-            row.toSeq(colTypes).map(Literal(_)) ++ newDefaultExpressions
+        val newDefaultExpressionsRow = new GenericInternalRow(
+          schema.fields.drop(local.output.size).map {

Review Comment:
   What if numUserSpecifiedColumns is 0 and the local.output.size is smaller than the length of schema?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157622700


##########
sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StructField, StructType, TimestampType}
+
+class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
+  val rule = ResolveDefaultColumns(catalog = null)
+
+  test("Assign correct types when adding DEFAULTs for inserting from a VALUES list") {

Review Comment:
   BTW do we have an end-to-end test for this one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157749910


##########
sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StructField, StructType, TimestampType}
+
+class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
+  val rule = ResolveDefaultColumns(catalog = null)
+
+  test("Assign correct types when adding DEFAULTs for inserting from a VALUES list") {

Review Comment:
   Good idea, I added an end-to-end reproduction in this test suite.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157610703


##########
sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala:
##########


Review Comment:
   Yes, good point! Moved.



##########
sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.analysis.{ResolveDefaultColumns, ResolveInlineTables, UnresolvedAttribute, UnresolvedInlineTable}
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StructField, StructType, TimestampType}
+
+class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
+  val rule = ResolveDefaultColumns(catalog = null)
+  def lit(v: Any): Literal = Literal(v)

Review Comment:
   Good catch, I removed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157804913


##########
sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StructField, StructType, TimestampType}
+
+class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
+  val rule = ResolveDefaultColumns(catalog = null)
+
+  test("SPARK-43018: Assign correct types for DEFAULTs with INSERT from a VALUES list") {
+    // This is the internal storage for the timestamp 2020-12-31 00:00:00.0.
+    val literal = Literal(1609401600000000L, TimestampType)
+    val table = UnresolvedInlineTable(
+      names = Seq("attr1"),
+      rows = Seq(Seq(literal)))
+    val node = ResolveInlineTables(table).asInstanceOf[LocalRelation]
+
+    assert(node.output.map(_.dataType) == Seq(TimestampType))
+    assert(node.data.size == 1)
+
+    val insertTableSchemaWithoutPartitionColumns = StructType(Seq(
+      StructField("c1", TimestampType),
+      StructField("c2", TimestampType)))
+    val result = rule.addMissingDefaultValuesForInsertFromInlineTable(
+      node, insertTableSchemaWithoutPartitionColumns, numUserSpecifiedColumns = 1)
+    val relation: LocalRelation = result match {
+      case r: LocalRelation => r
+      case _ => fail(s"invalid result operator type: $result")
+    }
+    assert(relation.output.map(_.name) == Seq("c1", "c2"))
+    val data: Seq[Seq[Any]] = relation.data.map { row =>
+      row.toSeq(StructType(relation.output.map(col => StructField(col.name, col.dataType))))
+    }
+    assert(data == Seq(Seq(literal.value, null)))
+  }
+
+  test("SPARK-43018: INSERT timestamp values into a table with column DEFAULTs") {
+    withTable("t") {
+      sql("create table t(id int, ts timestamp) using parquet")
+      sql("insert into t (ts) values (timestamp'2023-01-01')")
+      val rows = sql("select id, ts from t").collect()
+      assert(rows.size == 1)
+      assert(rows.head.toString == "[42,2023-01-01 00:00:00.0]")

Review Comment:
   Apologies, this was from a previous test, updated this to use `checkAnswer` with the right value instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timestamp literals

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1223367659


##########
sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StructField, StructType, TimestampType}
+
+class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
+  val rule = ResolveDefaultColumns(catalog = null)
+  // This is the internal storage for the timestamp 2020-12-31 00:00:00.0.
+  val literal = Literal(1609401600000000L, TimestampType)
+  val table = UnresolvedInlineTable(
+    names = Seq("attr1"),
+    rows = Seq(Seq(literal)))
+  val localRelation = ResolveInlineTables(table).asInstanceOf[LocalRelation]
+
+  def asLocalRelation(result: LogicalPlan): LocalRelation = result match {
+    case r: LocalRelation => r
+    case _ => fail(s"invalid result operator type: $result")
+  }
+
+  test("SPARK-43018: Add DEFAULTs for INSERT from VALUES list with user-defined columns") {
+    // Call the 'addMissingDefaultValuesForInsertFromInlineTable' method with one user-specified
+    // column. We add a default value of NULL to the row as a result.
+    val insertTableSchemaWithoutPartitionColumns = StructType(Seq(
+      StructField("c1", TimestampType),
+      StructField("c2", TimestampType)))
+    val (result: LogicalPlan, _: Boolean) =
+      rule.addMissingDefaultValuesForInsertFromInlineTable(
+        localRelation, insertTableSchemaWithoutPartitionColumns, numUserSpecifiedColumns = 1)
+    val relation = asLocalRelation(result)
+    assert(relation.output.map(_.name) == Seq("c1", "c2"))
+    val data: Seq[Seq[Any]] = relation.data.map { row =>
+      row.toSeq(StructType(relation.output.map(col => StructField(col.name, col.dataType))))
+    }
+    assert(data == Seq(Seq(literal.value, null)))
+  }
+
+  test("SPARK-43018: Add no DEFAULTs for INSERT from VALUES list with no user-defined columns") {
+    // Call the 'addMissingDefaultValuesForInsertFromInlineTable' method with zero user-specified
+    // columns. The table is unchanged because there are no default columns to add in this case.
+    val insertTableSchemaWithoutPartitionColumns = StructType(Seq(
+      StructField("c1", TimestampType),
+      StructField("c2", TimestampType)))
+    val (result: LogicalPlan, _: Boolean) =
+      rule.addMissingDefaultValuesForInsertFromInlineTable(
+        localRelation, insertTableSchemaWithoutPartitionColumns, numUserSpecifiedColumns = 0)
+    assert(asLocalRelation(result) == localRelation)
+  }
+
+  test("SPARK-43018: INSERT timestamp values into a table with column DEFAULTs") {

Review Comment:
   Especially, this test case tests more than `catalyst/analysis`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timestamp literals

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1223394862


##########
sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis

Review Comment:
   Hi @dongjoon-hyun I don't think this is intentional, we could move the `ResolveDefaultColumnsSuite` to `org.apache.spark.sql` package. What do you think? If you want me to do this, I can prepare a PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timestamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157988309


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -271,32 +271,37 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
   /**
    * Updates an inline table to generate missing default column values.
    */
-  private def addMissingDefaultValuesForInsertFromInlineTable(
+  def addMissingDefaultValuesForInsertFromInlineTable(
       node: LogicalPlan,
       insertTableSchemaWithoutPartitionColumns: StructType,
       numUserSpecifiedColumns: Int): LogicalPlan = {
     val schema = insertTableSchemaWithoutPartitionColumns
-    val newDefaultExpressions: Seq[Expression] =
+    val newDefaultExpressions: Seq[UnresolvedAttribute] =
       getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
-    val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) {
-      schema.fields.drop(numUserSpecifiedColumns).map(_.name)
-    } else {
-      schema.fields.map(_.name)
-    }
+    val newNames: Seq[String] = schema.fields.map(_.name)
     node match {
       case _ if newDefaultExpressions.isEmpty => node
       case table: UnresolvedInlineTable =>
         table.copy(
-          names = table.names ++ newNames,
+          names = newNames,
           rows = table.rows.map { row => row ++ newDefaultExpressions })
-      case local: LocalRelation =>
-        // Note that we have consumed a LocalRelation but return an UnresolvedInlineTable, because
-        // addMissingDefaultValuesForInsertFromProject must replace unresolved DEFAULT references.
-        UnresolvedInlineTable(
-          local.output.map(_.name) ++ newNames,
-          local.data.map { row =>
-            val colTypes = StructType(local.output.map(col => StructField(col.name, col.dataType)))
-            row.toSeq(colTypes).map(Literal(_)) ++ newDefaultExpressions
+      case local: LocalRelation
+        if (numUserSpecifiedColumns > 0 && node.output.size <= numUserSpecifiedColumns) ||

Review Comment:
   I am confused about the conditions here. Could you add a comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157627009


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -271,32 +271,33 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
   /**
    * Updates an inline table to generate missing default column values.
    */
-  private def addMissingDefaultValuesForInsertFromInlineTable(
+  def addMissingDefaultValuesForInsertFromInlineTable(
       node: LogicalPlan,
       insertTableSchemaWithoutPartitionColumns: StructType,
       numUserSpecifiedColumns: Int): LogicalPlan = {
     val schema = insertTableSchemaWithoutPartitionColumns
     val newDefaultExpressions: Seq[Expression] =
       getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
-    val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) {
-      schema.fields.drop(numUserSpecifiedColumns).map(_.name)
-    } else {
-      schema.fields.map(_.name)
-    }
+    val newNames: Seq[String] = schema.fields.map(_.name)
     node match {
       case _ if newDefaultExpressions.isEmpty => node
       case table: UnresolvedInlineTable =>
         table.copy(
-          names = table.names ++ newNames,
+          names = newNames,
           rows = table.rows.map { row => row ++ newDefaultExpressions })
       case local: LocalRelation =>
         // Note that we have consumed a LocalRelation but return an UnresolvedInlineTable, because
         // addMissingDefaultValuesForInsertFromProject must replace unresolved DEFAULT references.
         UnresolvedInlineTable(
-          local.output.map(_.name) ++ newNames,
+          newNames,
           local.data.map { row =>
             val colTypes = StructType(local.output.map(col => StructField(col.name, col.dataType)))

Review Comment:
   BTW, can we simplify `colTypes` as `local.schema`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157777619


##########
sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StructField, StructType, TimestampType}
+
+class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
+  val rule = ResolveDefaultColumns(catalog = null)
+
+  test("SPARK-43018: Assign correct types for DEFAULTs with INSERT from a VALUES list") {
+    // This is the internal storage for the timestamp 2020-12-31 00:00:00.0.
+    val literal = Literal(1609401600000000L, TimestampType)
+    val table = UnresolvedInlineTable(
+      names = Seq("attr1"),
+      rows = Seq(Seq(literal)))
+    val node = ResolveInlineTables(table).asInstanceOf[LocalRelation]
+
+    assert(node.output.map(_.dataType) == Seq(TimestampType))
+    assert(node.data.size == 1)
+
+    val insertTableSchemaWithoutPartitionColumns = StructType(Seq(
+      StructField("c1", TimestampType),
+      StructField("c2", TimestampType)))
+    val result = rule.addMissingDefaultValuesForInsertFromInlineTable(
+      node, insertTableSchemaWithoutPartitionColumns, numUserSpecifiedColumns = 1)
+    val relation: LocalRelation = result match {
+      case r: LocalRelation => r
+      case _ => fail(s"invalid result operator type: $result")
+    }
+    assert(relation.output.map(_.name) == Seq("c1", "c2"))
+    val data: Seq[Seq[Any]] = relation.data.map { row =>
+      row.toSeq(StructType(relation.output.map(col => StructField(col.name, col.dataType))))
+    }
+    assert(data == Seq(Seq(literal.value, null)))
+  }
+
+  test("SPARK-43018: INSERT timestamp values into a table with column DEFAULTs") {
+    withTable("t") {
+      sql("create table t(id int, ts timestamp) using parquet")
+      sql("insert into t (ts) values (timestamp'2023-01-01')")
+      val rows = sql("select id, ts from t").collect()
+      assert(rows.size == 1)
+      assert(rows.head.toString == "[42,2023-01-01 00:00:00.0]")

Review Comment:
   BTW, where is the 42 from? There is no default value from the create table statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157615804


##########
sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.analysis.{ResolveDefaultColumns, ResolveInlineTables, UnresolvedAttribute, UnresolvedInlineTable}
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StructField, StructType, TimestampType}
+
+class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
+  val rule = ResolveDefaultColumns(catalog = null)
+  def lit(v: Any): Literal = Literal(v)

Review Comment:
   Oh I meant
   ```
   sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157602703


##########
sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala:
##########


Review Comment:
   This should be under `sq/catalyst`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157620766


##########
sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala:
##########


Review Comment:
   NP, moved to this package instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timetstamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1157631790


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -271,32 +271,33 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
   /**
    * Updates an inline table to generate missing default column values.
    */
-  private def addMissingDefaultValuesForInsertFromInlineTable(
+  def addMissingDefaultValuesForInsertFromInlineTable(
       node: LogicalPlan,
       insertTableSchemaWithoutPartitionColumns: StructType,
       numUserSpecifiedColumns: Int): LogicalPlan = {
     val schema = insertTableSchemaWithoutPartitionColumns
     val newDefaultExpressions: Seq[Expression] =
       getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
-    val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) {
-      schema.fields.drop(numUserSpecifiedColumns).map(_.name)
-    } else {
-      schema.fields.map(_.name)
-    }
+    val newNames: Seq[String] = schema.fields.map(_.name)
     node match {
       case _ if newDefaultExpressions.isEmpty => node
       case table: UnresolvedInlineTable =>
         table.copy(
-          names = table.names ++ newNames,
+          names = newNames,
           rows = table.rows.map { row => row ++ newDefaultExpressions })
       case local: LocalRelation =>
         // Note that we have consumed a LocalRelation but return an UnresolvedInlineTable, because
         // addMissingDefaultValuesForInsertFromProject must replace unresolved DEFAULT references.
         UnresolvedInlineTable(
-          local.output.map(_.name) ++ newNames,
+          newNames,
           local.data.map { row =>
             val colTypes = StructType(local.output.map(col => StructField(col.name, col.dataType)))
-            row.toSeq(colTypes).map(Literal(_)) ++ newDefaultExpressions
+            val values: Seq[Any] = row.toSeq(colTypes)
+            val dataTypes: Seq[DataType] = colTypes.map(_.dataType)
+            val literals: Seq[Literal] = values.zip(dataTypes).map {
+              case (value, dataType) => Literal(value, dataType)

Review Comment:
   Since we are touching the code here, do we need to handle array/map/struct type?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang closed pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timestamp literals

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang closed pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timestamp literals
URL: https://github.com/apache/spark/pull/40652


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #40652: [SPARK-43018][SQL] Fix bug for INSERT commands with timestamp literals

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #40652:
URL: https://github.com/apache/spark/pull/40652#discussion_r1223414585


##########
sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis

Review Comment:
   Thank you. I made a PR.
   - https://github.com/apache/spark/pull/41520



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org