You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dtenedor (via GitHub)" <gi...@apache.org> on 2023/10/12 23:48:33 UTC

[PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for non-nullable column [spark]

dtenedor opened a new pull request, #43356:
URL: https://github.com/apache/spark/pull/43356

   ### What changes were proposed in this pull request?
   
   This PR updates Python UDTF evaluation to return a useful error message if UDTF returns None for any non-nullable column.
   
   ### Why are the changes needed?
   
   Previously this case returned a null pointer exception.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, see above.
   
   ### How was this patch tested?
   
   This PR adds new test coverage.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for any non-nullable column [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #43356:
URL: https://github.com/apache/spark/pull/43356#issuecomment-1773464159

   The failed tests seem not related to this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for non-nullable column [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #43356:
URL: https://github.com/apache/spark/pull/43356#issuecomment-1760545995

   cc @ueshin 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for any non-nullable column [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43356:
URL: https://github.com/apache/spark/pull/43356#discussion_r1366194148


##########
python/pyspark/worker.py:
##########
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compares each UDTF output row against the output schema for this particular UDTF call,
+    # raising an error if the two are incompatible.
+    def check_output_row_against_schema(row: Any) -> None:
+        nonlocal return_type
+        for result_column_index in range(len(return_type)):
+
+            def check_for_none_in_non_nullable_column(
+                value: Any, data_type: DataType, nullable: bool
+            ) -> None:
+                if value is None and not nullable:
+                    raise PySparkRuntimeError(
+                        error_class="UDTF_EXEC_ERROR",
+                        message_parameters={
+                            "method_name": "eval' or 'terminate",
+                            "error": f"Column {result_column_index} within a returned row had a "
+                            + "value of None, either directly or within array/struct/map "
+                            + "subfields, but the corresponding column type was declared as "
+                            + "non-nullable; please update the UDTF to return a non-None value at "
+                            + "this location or otherwise declare the column type as nullable.",
+                        },
+                    )
+                elif (
+                    isinstance(data_type, ArrayType)
+                    and isinstance(value, list)
+                    and not data_type.containsNull
+                ):
+                    for sub_value in value:
+                        check_for_none_in_non_nullable_column(
+                            sub_value, data_type.elementType, data_type.containsNull
+                        )
+                elif isinstance(data_type, StructType) and isinstance(value, Row):
+                    for field_name, field_value in value.asDict().items():
+                        subfield: StructField = data_type[field_name]
+                        check_for_none_in_non_nullable_column(
+                            field_value, subfield.dataType, subfield.nullable
+                        )
+                elif isinstance(data_type, MapType):
+                    if isinstance(value, dict):
+                        items = value.items()
+                    elif isinstance(value, Row):
+                        items = value.asDict().items()

Review Comment:
   Turns out, never :) I removed this check, it is simpler now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for any non-nullable column [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43356:
URL: https://github.com/apache/spark/pull/43356#discussion_r1366195485


##########
python/pyspark/worker.py:
##########
@@ -879,6 +940,8 @@ def verify_result(result):
                 verify_pandas_result(
                     result, return_type, assign_cols_by_name=False, truncate_return_schema=False
                 )
+                for result_tuple in result.itertuples():
+                    check_output_row_against_schema(list(result_tuple))

Review Comment:
   I tried that originally but the UDTF results is an Iterable and it turns out that iterating through it consumes the values, making it impossible to create the DataFrame after because the iterator is empty :)



##########
python/pyspark/worker.py:
##########
@@ -879,6 +940,8 @@ def verify_result(result):
                 verify_pandas_result(
                     result, return_type, assign_cols_by_name=False, truncate_return_schema=False
                 )
+                for result_tuple in result.itertuples():
+                    check_output_row_against_schema(list(result_tuple))

Review Comment:
   I tried that originally but the UDTF result is an Iterable and it turns out that iterating through it consumes the values, making it impossible to create the DataFrame after because the iterator is empty :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for non-nullable column [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43356:
URL: https://github.com/apache/spark/pull/43356#discussion_r1357588599


##########
python/pyspark/worker.py:
##########
@@ -841,6 +841,27 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compute the set of UDTF result columns whose types are not nullable.
+    # We will check that the UDTF does not return None values for these columns below.
+    non_nullable_result_cols = set()
+    for i, field in enumerate(return_type):

Review Comment:
   Also we should check the other type of UDFs, including Scala UDFs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for any non-nullable column [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43356:
URL: https://github.com/apache/spark/pull/43356#discussion_r1366193870


##########
python/pyspark/worker.py:
##########
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compares each UDTF output row against the output schema for this particular UDTF call,
+    # raising an error if the two are incompatible.
+    def check_output_row_against_schema(row: Any) -> None:
+        nonlocal return_type
+        for result_column_index in range(len(return_type)):
+
+            def check_for_none_in_non_nullable_column(
+                value: Any, data_type: DataType, nullable: bool
+            ) -> None:
+                if value is None and not nullable:
+                    raise PySparkRuntimeError(
+                        error_class="UDTF_EXEC_ERROR",
+                        message_parameters={
+                            "method_name": "eval' or 'terminate",
+                            "error": f"Column {result_column_index} within a returned row had a "
+                            + "value of None, either directly or within array/struct/map "
+                            + "subfields, but the corresponding column type was declared as "
+                            + "non-nullable; please update the UDTF to return a non-None value at "
+                            + "this location or otherwise declare the column type as nullable.",
+                        },
+                    )
+                elif (
+                    isinstance(data_type, ArrayType)
+                    and isinstance(value, list)
+                    and not data_type.containsNull
+                ):
+                    for sub_value in value:
+                        check_for_none_in_non_nullable_column(
+                            sub_value, data_type.elementType, data_type.containsNull
+                        )
+                elif isinstance(data_type, StructType) and isinstance(value, Row):
+                    for field_name, field_value in value.asDict().items():

Review Comment:
   Good point; I switched this to iterate through the row using column indexes instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for non-nullable column [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor closed pull request #43356: [SPARK-45523][Python] Return useful error message if UDTF returns None for non-nullable column
URL: https://github.com/apache/spark/pull/43356


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for any non-nullable column [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin closed pull request #43356: [SPARK-45523][Python] Return useful error message if UDTF returns None for any non-nullable column
URL: https://github.com/apache/spark/pull/43356


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for any non-nullable column [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43356:
URL: https://github.com/apache/spark/pull/43356#discussion_r1364486288


##########
sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala:
##########
@@ -749,6 +749,363 @@ object IntegratedUDFTestUtils extends SQLHelper {
     val prettyName: String = "Python UDTF whose 'analyze' method sets state and reads it later"
   }
 
+  object TestPythonUDTFInvalidEvalReturnsNoneToNonNullableColumnScalarType extends TestUDTF {
+    val name: String = "TestPythonUDTFInvalidEvalReturnsNoneToNonNullableColumnScalarType"

Review Comment:
   It would be great if we could make this name a bit shorter :) 



##########
python/pyspark/worker.py:
##########
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compares each UDTF output row against the output schema for this particular UDTF call,
+    # raising an error if the two are incompatible.
+    def check_output_row_against_schema(row: Any) -> None:

Review Comment:
   @ueshin do you think this will add extra performance overhead if we check this for each output row?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for any non-nullable column [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43356:
URL: https://github.com/apache/spark/pull/43356#discussion_r1364577084


##########
sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala:
##########
@@ -749,6 +749,363 @@ object IntegratedUDFTestUtils extends SQLHelper {
     val prettyName: String = "Python UDTF whose 'analyze' method sets state and reads it later"
   }
 
+  object TestPythonUDTFInvalidEvalReturnsNoneToNonNullableColumnScalarType extends TestUDTF {
+    val name: String = "TestPythonUDTFInvalidEvalReturnsNoneToNonNullableColumnScalarType"

Review Comment:
   Sounds good, I made all these invalid test UDTF names shorter!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for any non-nullable column [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43356:
URL: https://github.com/apache/spark/pull/43356#discussion_r1366193147


##########
python/pyspark/worker.py:
##########
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compares each UDTF output row against the output schema for this particular UDTF call,
+    # raising an error if the two are incompatible.
+    def check_output_row_against_schema(row: Any) -> None:

Review Comment:
   Sure, I added these check back for onw.



##########
python/pyspark/worker.py:
##########
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compares each UDTF output row against the output schema for this particular UDTF call,
+    # raising an error if the two are incompatible.
+    def check_output_row_against_schema(row: Any) -> None:

Review Comment:
   Sure, I added this check back for onw.



##########
python/pyspark/worker.py:
##########
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compares each UDTF output row against the output schema for this particular UDTF call,
+    # raising an error if the two are incompatible.
+    def check_output_row_against_schema(row: Any) -> None:

Review Comment:
   Sure, I added this check back for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for non-nullable column [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43356:
URL: https://github.com/apache/spark/pull/43356#discussion_r1358990069


##########
python/pyspark/worker.py:
##########
@@ -841,6 +841,27 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compute the set of UDTF result columns whose types are not nullable.
+    # We will check that the UDTF does not return None values for these columns below.
+    non_nullable_result_cols = set()
+    for i, field in enumerate(return_type):

Review Comment:
   Thanks for your reviews! I updated this PR to also check recursively for `None` values within array, struct, and map values as well.
   We can certainly consider other types of UDFs as well later (including Scala UDFs); it seems possible to decouple that work from Python UDTFs here, so I'll leave this PR to focus on the latter for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for any non-nullable column [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43356:
URL: https://github.com/apache/spark/pull/43356#discussion_r1364492809


##########
python/pyspark/worker.py:
##########
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compares each UDTF output row against the output schema for this particular UDTF call,
+    # raising an error if the two are incompatible.
+    def check_output_row_against_schema(row: Any) -> None:

Review Comment:
   Note: In a previous iteration of this PR, I had a check to see if the schema contained any non-nullable columns in order to enable this. However, I would like to extend these checks to compare provided row values against the expected output schema column types, which currently produce internal exceptions instead of good error messages if they don't match. We would need to check every value in every row in that case, so I figured it was OK to just do that here as well.



##########
python/pyspark/worker.py:
##########
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compares each UDTF output row against the output schema for this particular UDTF call,
+    # raising an error if the two are incompatible.
+    def check_output_row_against_schema(row: Any) -> None:

Review Comment:
   Note: In a previous iteration of this PR, I had a check to see if the schema contained any non-nullable columns in order to enable this. However, I would like to later extend these checks to compare provided row values against the expected output schema column types, which currently produce internal exceptions instead of good error messages if they don't match. We would need to check every value in every row in that case, so I figured it was OK to just do that here as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for non-nullable column [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43356:
URL: https://github.com/apache/spark/pull/43356#discussion_r1357584727


##########
python/pyspark/worker.py:
##########
@@ -841,6 +841,27 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compute the set of UDTF result columns whose types are not nullable.
+    # We will check that the UDTF does not return None values for these columns below.
+    non_nullable_result_cols = set()
+    for i, field in enumerate(return_type):

Review Comment:
   I'm afraid this is not enough as potentially this may also happen in the nested fields, like array, map, and struct types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for any non-nullable column [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43356:
URL: https://github.com/apache/spark/pull/43356#discussion_r1366193568


##########
python/pyspark/worker.py:
##########
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compares each UDTF output row against the output schema for this particular UDTF call,
+    # raising an error if the two are incompatible.
+    def check_output_row_against_schema(row: Any) -> None:
+        nonlocal return_type

Review Comment:
   removed this now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for any non-nullable column [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43356:
URL: https://github.com/apache/spark/pull/43356#discussion_r1365955494


##########
python/pyspark/worker.py:
##########
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compares each UDTF output row against the output schema for this particular UDTF call,
+    # raising an error if the two are incompatible.
+    def check_output_row_against_schema(row: Any) -> None:
+        nonlocal return_type

Review Comment:
   Do we need this?



##########
python/pyspark/worker.py:
##########
@@ -879,6 +940,8 @@ def verify_result(result):
                 verify_pandas_result(
                     result, return_type, assign_cols_by_name=False, truncate_return_schema=False
                 )
+                for result_tuple in result.itertuples():
+                    check_output_row_against_schema(list(result_tuple))

Review Comment:
   Shall we move this to before the pandas DataFrame is created?



##########
python/pyspark/worker.py:
##########
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compares each UDTF output row against the output schema for this particular UDTF call,
+    # raising an error if the two are incompatible.
+    def check_output_row_against_schema(row: Any) -> None:

Review Comment:
   Yes, this will add huge performance overhead.
   
   @dtenedor Could we at least build the check function based on the data type in advance?
   
   ```py
   check_output_row_against_schema = _build_null_checker(return_type)
   ```
   
   Checking the data type and `nullable` each row should be too expensive.
   
   The builder should be placed somewhere reusable.



##########
python/pyspark/worker.py:
##########
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compares each UDTF output row against the output schema for this particular UDTF call,
+    # raising an error if the two are incompatible.
+    def check_output_row_against_schema(row: Any) -> None:
+        nonlocal return_type
+        for result_column_index in range(len(return_type)):
+
+            def check_for_none_in_non_nullable_column(
+                value: Any, data_type: DataType, nullable: bool
+            ) -> None:
+                if value is None and not nullable:
+                    raise PySparkRuntimeError(
+                        error_class="UDTF_EXEC_ERROR",
+                        message_parameters={
+                            "method_name": "eval' or 'terminate",
+                            "error": f"Column {result_column_index} within a returned row had a "
+                            + "value of None, either directly or within array/struct/map "
+                            + "subfields, but the corresponding column type was declared as "
+                            + "non-nullable; please update the UDTF to return a non-None value at "
+                            + "this location or otherwise declare the column type as nullable.",
+                        },
+                    )
+                elif (
+                    isinstance(data_type, ArrayType)
+                    and isinstance(value, list)
+                    and not data_type.containsNull
+                ):
+                    for sub_value in value:
+                        check_for_none_in_non_nullable_column(
+                            sub_value, data_type.elementType, data_type.containsNull
+                        )
+                elif isinstance(data_type, StructType) and isinstance(value, Row):
+                    for field_name, field_value in value.asDict().items():

Review Comment:
   `adDict` will break the case there are duplicated field names.



##########
python/pyspark/worker.py:
##########
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compares each UDTF output row against the output schema for this particular UDTF call,
+    # raising an error if the two are incompatible.
+    def check_output_row_against_schema(row: Any) -> None:
+        nonlocal return_type
+        for result_column_index in range(len(return_type)):
+
+            def check_for_none_in_non_nullable_column(
+                value: Any, data_type: DataType, nullable: bool
+            ) -> None:
+                if value is None and not nullable:
+                    raise PySparkRuntimeError(
+                        error_class="UDTF_EXEC_ERROR",
+                        message_parameters={
+                            "method_name": "eval' or 'terminate",
+                            "error": f"Column {result_column_index} within a returned row had a "
+                            + "value of None, either directly or within array/struct/map "
+                            + "subfields, but the corresponding column type was declared as "
+                            + "non-nullable; please update the UDTF to return a non-None value at "
+                            + "this location or otherwise declare the column type as nullable.",
+                        },
+                    )
+                elif (
+                    isinstance(data_type, ArrayType)
+                    and isinstance(value, list)
+                    and not data_type.containsNull
+                ):
+                    for sub_value in value:
+                        check_for_none_in_non_nullable_column(
+                            sub_value, data_type.elementType, data_type.containsNull
+                        )
+                elif isinstance(data_type, StructType) and isinstance(value, Row):
+                    for field_name, field_value in value.asDict().items():
+                        subfield: StructField = data_type[field_name]
+                        check_for_none_in_non_nullable_column(
+                            field_value, subfield.dataType, subfield.nullable
+                        )
+                elif isinstance(data_type, MapType):
+                    if isinstance(value, dict):
+                        items = value.items()
+                    elif isinstance(value, Row):
+                        items = value.asDict().items()

Review Comment:
   In what case does this happen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45523][Python] Return useful error message if UDTF returns None for any non-nullable column [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #43356:
URL: https://github.com/apache/spark/pull/43356#issuecomment-1773464269

   Thanks! merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org