You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhengruifeng (via GitHub)" <gi...@apache.org> on 2023/03/01 12:52:07 UTC

[GitHub] [spark] zhengruifeng opened a new pull request, #40233: [SPARK-42630][CONNECT][PYTHON] Make `parse_data_type` use new proto message `DDLParse`

zhengruifeng opened a new pull request, #40233:
URL: https://github.com/apache/spark/pull/40233

   ### What changes were proposed in this pull request?
   Make `parse_data_type` use new proto message `DDLParse`
   
   ### Why are the changes needed?
   when the ddl_string represents an atomic type, existing method based on `DataFrame.schema` (which is always a StructType) has an extra conversion: AtomicType -> StructType -> AtomicType
   
   This PR leverages the dedicated proto message
   
   
   ### Does this PR introduce _any_ user-facing change?
   no
   
   
   ### How was this patch tested?
   existing UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40233: [SPARK-42630][CONNECT][PYTHON] Make `parse_data_type` use new proto message `DDLParse`

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40233:
URL: https://github.com/apache/spark/pull/40233#discussion_r1121682215


##########
python/pyspark/sql/connect/types.py:
##########
@@ -349,13 +349,9 @@ def parse_data_type(data_type: str) -> DataType:
     from pyspark.sql import SparkSession as PySparkSession
 
     assert is_remote()
-    return_type_schema = (
-        PySparkSession.builder.getOrCreate().createDataFrame(data=[], schema=data_type).schema
-    )
-    with_col_name = " " in data_type.strip()
-    if len(return_type_schema.fields) == 1 and not with_col_name:
-        # To match pyspark.sql.types._parse_datatype_string
-        return_type = return_type_schema.fields[0].dataType
-    else:
-        return_type = return_type_schema
-    return return_type
+    session = PySparkSession.builder.getOrCreate()
+    parsed = session.client._analyze(  # type: ignore[attr-defined]
+        method="ddl_parse", ddl_string=data_type
+    ).parsed

Review Comment:
   if I use `PySparkSession.builder.getOrCreate().client._analyze(method="ddl_parse", ddl_string=data_type)` here, the test `test_udf` always fails with:
   
   ```
   ======================================================================
   ERROR [4.853s]: test_udf (pyspark.sql.tests.connect.test_connect_function.SparkConnectFunctionTests)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 746, in _analyze
       resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
     File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev/lib/python3.9/site-packages/grpc/_channel.py", line 944, in __call__
       state, call, = self._blocking(request, timeout, metadata, credentials,
     File "/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev/lib/python3.9/site-packages/grpc/_channel.py", line 926, in _blocking
       call = self._channel.segregated_call(
     File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 496, in grpc._cython.cygrpc.Channel.segregated_call
     File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 353, in grpc._cython.cygrpc._segregated_call
     File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 357, in grpc._cython.cygrpc._segregated_call
   ValueError: Cannot invoke RPC on closed channel!
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/tests/connect/test_connect_function.py", line 2374, in test_udf
       cdf.withColumn("C", CF.udf(lambda x: len(x), "int")(cdf.c)).toPandas(),
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/functions.py", line 2463, in udf
       return _create_udf(f=f, returnType=returnType, evalType=PythonEvalType.SQL_BATCHED_UDF)
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/udf.py", line 59, in _create_udf
       udf_obj = UserDefinedFunction(
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/udf.py", line 103, in __init__
       parse_data_type(returnType) if isinstance(returnType, str) else returnType
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/types.py", line 353, in parse_data_type
       parsed = PySparkSession.builder.getOrCreate().client._analyze(  # type: ignore[attr-defined]
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 752, in _analyze
       return AnalyzeResult.fromProto(resp)
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 950, in __exit__
       if self._can_retry(exc_val):
     File "/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 461, in retry_exception
       return e.code() == grpc.StatusCode.UNAVAILABLE
   AttributeError: 'ValueError' object has no attribute 'code'
   
   ```
   
   Do you have any idea on it? @HyukjinKwon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #40233: [SPARK-42630][CONNECT][PYTHON] Make `parse_data_type` use new proto message `DDLParse`

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #40233:
URL: https://github.com/apache/spark/pull/40233#discussion_r1122521983


##########
python/pyspark/sql/connect/types.py:
##########
@@ -349,13 +349,9 @@ def parse_data_type(data_type: str) -> DataType:
     from pyspark.sql import SparkSession as PySparkSession
 
     assert is_remote()
-    return_type_schema = (
-        PySparkSession.builder.getOrCreate().createDataFrame(data=[], schema=data_type).schema
-    )
-    with_col_name = " " in data_type.strip()
-    if len(return_type_schema.fields) == 1 and not with_col_name:
-        # To match pyspark.sql.types._parse_datatype_string
-        return_type = return_type_schema.fields[0].dataType
-    else:
-        return_type = return_type_schema
-    return return_type
+    session = PySparkSession.builder.getOrCreate()
+    parsed = session.client._analyze(  # type: ignore[attr-defined]
+        method="ddl_parse", ddl_string=data_type
+    ).parsed
+    assert parsed is not None
+    return proto_schema_to_pyspark_data_type(parsed)

Review Comment:
   I guess we should convert from proto to DataType in `AnalyzeResult.fromProto`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #40233: [WIP][SPARK-42630][CONNECT][PYTHON] Make `parse_data_type` use new proto message `DDLParse`

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40233:
URL: https://github.com/apache/spark/pull/40233#issuecomment-1459503928

   close in favor of https://github.com/apache/spark/pull/40260


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng closed pull request #40233: [WIP][SPARK-42630][CONNECT][PYTHON] Make `parse_data_type` use new proto message `DDLParse`

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng closed pull request #40233: [WIP][SPARK-42630][CONNECT][PYTHON] Make `parse_data_type` use new proto message `DDLParse`
URL: https://github.com/apache/spark/pull/40233


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40233: [SPARK-42630][CONNECT][PYTHON] Make `parse_data_type` use new proto message `DDLParse`

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40233:
URL: https://github.com/apache/spark/pull/40233#discussion_r1122463665


##########
python/pyspark/sql/connect/types.py:
##########
@@ -349,13 +349,9 @@ def parse_data_type(data_type: str) -> DataType:
     from pyspark.sql import SparkSession as PySparkSession
 
     assert is_remote()
-    return_type_schema = (
-        PySparkSession.builder.getOrCreate().createDataFrame(data=[], schema=data_type).schema
-    )
-    with_col_name = " " in data_type.strip()
-    if len(return_type_schema.fields) == 1 and not with_col_name:
-        # To match pyspark.sql.types._parse_datatype_string
-        return_type = return_type_schema.fields[0].dataType
-    else:
-        return_type = return_type_schema
-    return return_type
+    session = PySparkSession.builder.getOrCreate()
+    parsed = session.client._analyze(  # type: ignore[attr-defined]
+        method="ddl_parse", ddl_string=data_type
+    ).parsed

Review Comment:
   cc @xinrong-meng 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org