You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "gengliangwang (via GitHub)" <gi...@apache.org> on 2023/04/04 23:17:49 UTC

[GitHub] [spark] gengliangwang commented on a diff in pull request #40655: [SPARK-42855][SQL] Use runtime null checks in TableOutputResolver

gengliangwang commented on code in PR #40655:
URL: https://github.com/apache/spark/pull/40655#discussion_r1157855435


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -130,38 +128,93 @@ object TableOutputResolver {
     }
   }
 
+  private def resolveColumnsByPosition(
+      inputCols: Seq[NamedExpression],
+      expectedCols: Seq[Attribute],
+      conf: SQLConf,
+      addError: String => Unit,
+      colPath: Seq[String] = Nil): Seq[NamedExpression] = {
+
+    if (inputCols.size > expectedCols.size) {
+      val extraColsStr = inputCols.takeRight(inputCols.size - expectedCols.size)
+        .map(col => s"'${col.name}'")
+        .mkString(", ")
+      addError(s"Cannot write extra fields to struct '${colPath.quoted}': $extraColsStr")
+      return Nil
+    } else if (inputCols.size < expectedCols.size) {
+      val missingColsStr = expectedCols.takeRight(expectedCols.size - inputCols.size)
+        .map(col => s"'${col.name}'")
+        .mkString(", ")
+      addError(s"Struct '${colPath.quoted}' missing fields: $missingColsStr")
+      return Nil
+    }
+
+    inputCols.zip(expectedCols).flatMap { case (inputCol, expectedCol) =>
+      val newColPath = colPath :+ expectedCol.name
+      (inputCol.dataType, expectedCol.dataType) match {
+        case (inputType: StructType, expectedType: StructType) =>
+          resolveStructType(
+            inputCol, inputType, expectedCol, expectedType,
+            byName = false, conf, addError, newColPath)
+        case (inputType: ArrayType, expectedType: ArrayType) =>
+          resolveArrayType(
+            inputCol, inputType, expectedCol, expectedType,
+            byName = false, conf, addError, newColPath)
+        case (inputType: MapType, expectedType: MapType) =>
+          resolveMapType(
+            inputCol, inputType, expectedCol, expectedType,
+            byName = false, conf, addError, newColPath)
+        case _ =>
+          checkField(expectedCol, inputCol, byName = false, conf, addError, newColPath)
+      }
+    }
+  }
+
   private def checkNullability(
       input: Expression,
       expected: Attribute,
       conf: SQLConf,
-      addError: String => Unit,
-      colPath: Seq[String]): Unit = {
-    if (input.nullable && !expected.nullable &&
-      conf.storeAssignmentPolicy != StoreAssignmentPolicy.LEGACY) {
-      addError(s"Cannot write nullable values to non-null column '${colPath.quoted}'")
+      colPath: Seq[String]): Expression = {
+    if (requiresNullChecks(input, expected, conf)) {
+      AssertNotNull(input, colPath)

Review Comment:
   @cloud-fan good point!
   @aokolnychyi yeah we can improve it later. The failed insertion won't create partial records in the target directory anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org