You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhengruifeng (via GitHub)" <gi...@apache.org> on 2023/03/01 12:54:11 UTC
[GitHub] [spark] zhengruifeng commented on a diff in pull request #40233: [SPARK-42630][CONNECT][PYTHON] Make `parse_data_type` use new proto message `DDLParse`
zhengruifeng commented on code in PR #40233:
URL: https://github.com/apache/spark/pull/40233#discussion_r1121682215
##########
python/pyspark/sql/connect/types.py:
##########
@@ -349,13 +349,9 @@ def parse_data_type(data_type: str) -> DataType:
from pyspark.sql import SparkSession as PySparkSession
assert is_remote()
- return_type_schema = (
- PySparkSession.builder.getOrCreate().createDataFrame(data=[], schema=data_type).schema
- )
- with_col_name = " " in data_type.strip()
- if len(return_type_schema.fields) == 1 and not with_col_name:
- # To match pyspark.sql.types._parse_datatype_string
- return_type = return_type_schema.fields[0].dataType
- else:
- return_type = return_type_schema
- return return_type
+ session = PySparkSession.builder.getOrCreate()
+ parsed = session.client._analyze( # type: ignore[attr-defined]
+ method="ddl_parse", ddl_string=data_type
+ ).parsed
Review Comment:
if I use `PySparkSession.builder.getOrCreate().client._analyze(method="ddl_parse", ddl_string=data_type)` here, the test `test_udf` always fails with:
```
======================================================================
ERROR [4.853s]: test_udf (pyspark.sql.tests.connect.test_connect_function.SparkConnectFunctionTests)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 746, in _analyze
resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev/lib/python3.9/site-packages/grpc/_channel.py", line 944, in __call__
state, call, = self._blocking(request, timeout, metadata, credentials,
File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev/lib/python3.9/site-packages/grpc/_channel.py", line 926, in _blocking
call = self._channel.segregated_call(
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 496, in grpc._cython.cygrpc.Channel.segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 353, in grpc._cython.cygrpc._segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 357, in grpc._cython.cygrpc._segregated_call
ValueError: Cannot invoke RPC on closed channel!
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/tests/connect/test_connect_function.py", line 2374, in test_udf
cdf.withColumn("C", CF.udf(lambda x: len(x), "int")(cdf.c)).toPandas(),
File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/functions.py", line 2463, in udf
return _create_udf(f=f, returnType=returnType, evalType=PythonEvalType.SQL_BATCHED_UDF)
File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/udf.py", line 59, in _create_udf
udf_obj = UserDefinedFunction(
File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/udf.py", line 103, in __init__
parse_data_type(returnType) if isinstance(returnType, str) else returnType
File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/types.py", line 353, in parse_data_type
parsed = PySparkSession.builder.getOrCreate().client._analyze( # type: ignore[attr-defined]
File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 752, in _analyze
return AnalyzeResult.fromProto(resp)
File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 950, in __exit__
if self._can_retry(exc_val):
File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 461, in retry_exception
return e.code() == grpc.StatusCode.UNAVAILABLE
AttributeError: 'ValueError' object has no attribute 'code'
```
Do you have any idea on it? @HyukjinKwon
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org