You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhengruifeng (via GitHub)" <gi...@apache.org> on 2023/03/01 12:54:11 UTC

[GitHub] [spark] zhengruifeng commented on a diff in pull request #40233: [SPARK-42630][CONNECT][PYTHON] Make `parse_data_type` use new proto message `DDLParse`

zhengruifeng commented on code in PR #40233:
URL: https://github.com/apache/spark/pull/40233#discussion_r1121682215


##########
python/pyspark/sql/connect/types.py:
##########
@@ -349,13 +349,9 @@ def parse_data_type(data_type: str) -> DataType:
     from pyspark.sql import SparkSession as PySparkSession
 
     assert is_remote()
-    return_type_schema = (
-        PySparkSession.builder.getOrCreate().createDataFrame(data=[], schema=data_type).schema
-    )
-    with_col_name = " " in data_type.strip()
-    if len(return_type_schema.fields) == 1 and not with_col_name:
-        # To match pyspark.sql.types._parse_datatype_string
-        return_type = return_type_schema.fields[0].dataType
-    else:
-        return_type = return_type_schema
-    return return_type
+    session = PySparkSession.builder.getOrCreate()
+    parsed = session.client._analyze(  # type: ignore[attr-defined]
+        method="ddl_parse", ddl_string=data_type
+    ).parsed

Review Comment:
   if I use `PySparkSession.builder.getOrCreate().client._analyze(method="ddl_parse", ddl_string=data_type)` here, the test `test_udf` always fails with:
   
   ```
   ======================================================================
   ERROR [4.853s]: test_udf (pyspark.sql.tests.connect.test_connect_function.SparkConnectFunctionTests)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 746, in _analyze
       resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
     File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev/lib/python3.9/site-packages/grpc/_channel.py", line 944, in __call__
       state, call, = self._blocking(request, timeout, metadata, credentials,
     File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev/lib/python3.9/site-packages/grpc/_channel.py", line 926, in _blocking
       call = self._channel.segregated_call(
     File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 496, in grpc._cython.cygrpc.Channel.segregated_call
     File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 353, in grpc._cython.cygrpc._segregated_call
     File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 357, in grpc._cython.cygrpc._segregated_call
   ValueError: Cannot invoke RPC on closed channel!
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/tests/connect/test_connect_function.py", line 2374, in test_udf
       cdf.withColumn("C", CF.udf(lambda x: len(x), "int")(cdf.c)).toPandas(),
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/functions.py", line 2463, in udf
       return _create_udf(f=f, returnType=returnType, evalType=PythonEvalType.SQL_BATCHED_UDF)
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/udf.py", line 59, in _create_udf
       udf_obj = UserDefinedFunction(
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/udf.py", line 103, in __init__
       parse_data_type(returnType) if isinstance(returnType, str) else returnType
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/types.py", line 353, in parse_data_type
       parsed = PySparkSession.builder.getOrCreate().client._analyze(  # type: ignore[attr-defined]
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 752, in _analyze
       return AnalyzeResult.fromProto(resp)
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 950, in __exit__
       if self._can_retry(exc_val):
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 461, in retry_exception
       return e.code() == grpc.StatusCode.UNAVAILABLE
   AttributeError: 'ValueError' object has no attribute 'code'
   
   ```
   
   Do you have any idea on it? @HyukjinKwon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org