You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "cloud-fan (via GitHub)" <gi...@apache.org> on 2023/04/04 07:35:59 UTC

[GitHub] [spark] cloud-fan commented on a diff in pull request #40655: [SPARK-42855][SQL] Use runtime null checks in TableOutputResolver

cloud-fan commented on code in PR #40655:
URL: https://github.com/apache/spark/pull/40655#discussion_r1156848413


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -130,38 +128,93 @@ object TableOutputResolver {
     }
   }
 
+  private def resolveColumnsByPosition(
+      inputCols: Seq[NamedExpression],
+      expectedCols: Seq[Attribute],
+      conf: SQLConf,
+      addError: String => Unit,
+      colPath: Seq[String] = Nil): Seq[NamedExpression] = {
+
+    if (inputCols.size > expectedCols.size) {
+      val extraColsStr = inputCols.takeRight(inputCols.size - expectedCols.size)
+        .map(col => s"'${col.name}'")
+        .mkString(", ")
+      addError(s"Cannot write extra fields to struct '${colPath.quoted}': $extraColsStr")
+      return Nil
+    } else if (inputCols.size < expectedCols.size) {
+      val missingColsStr = expectedCols.takeRight(expectedCols.size - inputCols.size)
+        .map(col => s"'${col.name}'")
+        .mkString(", ")
+      addError(s"Struct '${colPath.quoted}' missing fields: $missingColsStr")
+      return Nil
+    }
+
+    inputCols.zip(expectedCols).flatMap { case (inputCol, expectedCol) =>
+      val newColPath = colPath :+ expectedCol.name
+      (inputCol.dataType, expectedCol.dataType) match {
+        case (inputType: StructType, expectedType: StructType) =>
+          resolveStructType(
+            inputCol, inputType, expectedCol, expectedType,
+            byName = false, conf, addError, newColPath)
+        case (inputType: ArrayType, expectedType: ArrayType) =>
+          resolveArrayType(
+            inputCol, inputType, expectedCol, expectedType,
+            byName = false, conf, addError, newColPath)
+        case (inputType: MapType, expectedType: MapType) =>
+          resolveMapType(
+            inputCol, inputType, expectedCol, expectedType,
+            byName = false, conf, addError, newColPath)
+        case _ =>
+          checkField(expectedCol, inputCol, byName = false, conf, addError, newColPath)
+      }
+    }
+  }
+
   private def checkNullability(
       input: Expression,
       expected: Attribute,
       conf: SQLConf,
-      addError: String => Unit,
-      colPath: Seq[String]): Unit = {
-    if (input.nullable && !expected.nullable &&
-      conf.storeAssignmentPolicy != StoreAssignmentPolicy.LEGACY) {
-      addError(s"Cannot write nullable values to non-null column '${colPath.quoted}'")
+      colPath: Seq[String]): Expression = {
+    if (requiresNullChecks(input, expected, conf)) {
+      AssertNotNull(input, colPath)

Review Comment:
   > It throws a generic NPE, which I believe triggers task retries.
   
   It seems to be an existing problem, for example ANSI mode. @gengliangwang shall we update the task retry logic to not retry if the exception has an error class which means a user error?
   
   > The way column path is formatted using new lines is a bit hard to read.
   
   We probably need to do both. The error reporting is also for dataset operation and the new line is better to display the object path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org