You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/11/17 00:02:18 UTC

[spark] branch master updated: [SPARK-41115][CONNECT] Add ClientType to proto to indicate which client sends a request

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fb61d2bdf1 [SPARK-41115][CONNECT] Add ClientType to proto to indicate which client sends a request
8fb61d2bdf1 is described below

commit 8fb61d2bdf1f4ee6c04fb60c233addfaa55e1d1a
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Thu Nov 17 09:01:26 2022 +0900

    [SPARK-41115][CONNECT] Add ClientType to proto to indicate which client sends a request
    
    ### What changes were proposed in this pull request?
    
    This PRs introduces a `ClientType` into Connect proto that can be included into Request to indicate the client type (e.g. Python client, Scala client, etc.)
    
    ### Why are the changes needed?
    
    Better understand requests by the server.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UT
    
    Closes #38630 from amaliujia/client-type.
    
    Authored-by: Rui Wang <ru...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../src/main/protobuf/spark/connect/base.proto     |  5 +++
 python/pyspark/sql/connect/client.py               | 16 ++++----
 python/pyspark/sql/connect/proto/base_pb2.py       | 44 +++++++++++-----------
 python/pyspark/sql/connect/proto/base_pb2.pyi      | 32 +++++++++++++++-
 4 files changed, 65 insertions(+), 32 deletions(-)

diff --git a/connector/connect/src/main/protobuf/spark/connect/base.proto b/connector/connect/src/main/protobuf/spark/connect/base.proto
index 5f59ada38b6..a521eab20d8 100644
--- a/connector/connect/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/base.proto
@@ -48,6 +48,11 @@ message Request {
   // The logical plan to be executed / analyzed.
   Plan plan = 3;
 
+  // Provides optional information about the client sending the request. This field
+  // can be used for language or version specific information and is only intended for
+  // logging purposes and will not be interpreted by the server.
+  optional string client_type = 4;
+
   // User Context is used to refer to one particular user session that is executing
   // queries in the backend.
   message UserContext {
diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py
index 62949720134..3c3203a8f51 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -301,9 +301,7 @@ class RemoteSparkSession(object):
         fun.parts.append(name)
         fun.serialized_function = cloudpickle.dumps((function, return_type))
 
-        req = pb2.Request()
-        if self._user_id is not None:
-            req.user_context.user_id = self._user_id
+        req = self._request_with_metadata()
         req.plan.command.create_function.CopyFrom(fun)
 
         self._execute_and_fetch(req)
@@ -357,9 +355,7 @@ class RemoteSparkSession(object):
         )
 
     def _to_pandas(self, plan: pb2.Plan) -> Optional[pandas.DataFrame]:
-        req = pb2.Request()
-        if self._user_id is not None:
-            req.user_context.user_id = self._user_id
+        req = self._request_with_metadata()
         req.plan.CopyFrom(plan)
         return self._execute_and_fetch(req)
 
@@ -407,12 +403,16 @@ class RemoteSparkSession(object):
         req.plan.command.CopyFrom(command)
         self._execute_and_fetch(req)
 
-    def _analyze(self, plan: pb2.Plan) -> AnalyzeResult:
+    def _request_with_metadata(self) -> pb2.Request:
         req = pb2.Request()
+        req.client_type = "_SPARK_CONNECT_PYTHON"
         if self._user_id:
             req.user_context.user_id = self._user_id
-        req.plan.CopyFrom(plan)
+        return req
 
+    def _analyze(self, plan: pb2.Plan) -> AnalyzeResult:
+        req = self._request_with_metadata()
+        req.plan.CopyFrom(plan)
         resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
         return AnalyzeResult.fromProto(resp)
 
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py
index 1f577089d1a..0527e9b49aa 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -35,7 +35,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"\x92\x02\n\x07Request\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12\x45\n\x0cuser_contex [...]
+    b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"\xc8\x02\n\x07Request\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12\x45\n\x0cuser_contex [...]
 )
 
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -49,25 +49,25 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _PLAN._serialized_start = 158
     _PLAN._serialized_end = 274
     _REQUEST._serialized_start = 277
-    _REQUEST._serialized_end = 551
-    _REQUEST_USERCONTEXT._serialized_start = 429
-    _REQUEST_USERCONTEXT._serialized_end = 551
-    _RESPONSE._serialized_start = 554
-    _RESPONSE._serialized_end = 1418
-    _RESPONSE_ARROWBATCH._serialized_start = 793
-    _RESPONSE_ARROWBATCH._serialized_end = 854
-    _RESPONSE_JSONBATCH._serialized_start = 856
-    _RESPONSE_JSONBATCH._serialized_end = 916
-    _RESPONSE_METRICS._serialized_start = 919
-    _RESPONSE_METRICS._serialized_end = 1403
-    _RESPONSE_METRICS_METRICOBJECT._serialized_start = 1003
-    _RESPONSE_METRICS_METRICOBJECT._serialized_end = 1313
-    _RESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 1201
-    _RESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 1313
-    _RESPONSE_METRICS_METRICVALUE._serialized_start = 1315
-    _RESPONSE_METRICS_METRICVALUE._serialized_end = 1403
-    _ANALYZERESPONSE._serialized_start = 1421
-    _ANALYZERESPONSE._serialized_end = 1555
-    _SPARKCONNECTSERVICE._serialized_start = 1558
-    _SPARKCONNECTSERVICE._serialized_end = 1720
+    _REQUEST._serialized_end = 605
+    _REQUEST_USERCONTEXT._serialized_start = 467
+    _REQUEST_USERCONTEXT._serialized_end = 589
+    _RESPONSE._serialized_start = 608
+    _RESPONSE._serialized_end = 1472
+    _RESPONSE_ARROWBATCH._serialized_start = 847
+    _RESPONSE_ARROWBATCH._serialized_end = 908
+    _RESPONSE_JSONBATCH._serialized_start = 910
+    _RESPONSE_JSONBATCH._serialized_end = 970
+    _RESPONSE_METRICS._serialized_start = 973
+    _RESPONSE_METRICS._serialized_end = 1457
+    _RESPONSE_METRICS_METRICOBJECT._serialized_start = 1057
+    _RESPONSE_METRICS_METRICOBJECT._serialized_end = 1367
+    _RESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 1255
+    _RESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 1367
+    _RESPONSE_METRICS_METRICVALUE._serialized_start = 1369
+    _RESPONSE_METRICS_METRICVALUE._serialized_end = 1457
+    _ANALYZERESPONSE._serialized_start = 1475
+    _ANALYZERESPONSE._serialized_end = 1609
+    _SPARKCONNECTSERVICE._serialized_start = 1612
+    _SPARKCONNECTSERVICE._serialized_end = 1774
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi
index bf6d080d9fd..e70f9db14a3 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -135,6 +135,7 @@ class Request(google.protobuf.message.Message):
     CLIENT_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
     PLAN_FIELD_NUMBER: builtins.int
+    CLIENT_TYPE_FIELD_NUMBER: builtins.int
     client_id: builtins.str
     """The client_id is set by the client to be able to collate streaming responses from
     different queries.
@@ -145,23 +146,50 @@ class Request(google.protobuf.message.Message):
     @property
     def plan(self) -> global___Plan:
         """The logical plan to be executed / analyzed."""
+    client_type: builtins.str
+    """Provides optional information about the client sending the request. This field
+    can be used for language or version specific information and is only intended for
+    logging purposes and will not be interpreted by the server.
+    """
     def __init__(
         self,
         *,
         client_id: builtins.str = ...,
         user_context: global___Request.UserContext | None = ...,
         plan: global___Plan | None = ...,
+        client_type: builtins.str | None = ...,
     ) -> None: ...
     def HasField(
         self,
-        field_name: typing_extensions.Literal["plan", b"plan", "user_context", b"user_context"],
+        field_name: typing_extensions.Literal[
+            "_client_type",
+            b"_client_type",
+            "client_type",
+            b"client_type",
+            "plan",
+            b"plan",
+            "user_context",
+            b"user_context",
+        ],
     ) -> builtins.bool: ...
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
-            "client_id", b"client_id", "plan", b"plan", "user_context", b"user_context"
+            "_client_type",
+            b"_client_type",
+            "client_id",
+            b"client_id",
+            "client_type",
+            b"client_type",
+            "plan",
+            b"plan",
+            "user_context",
+            b"user_context",
         ],
     ) -> None: ...
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_client_type", b"_client_type"]
+    ) -> typing_extensions.Literal["client_type"] | None: ...
 
 global___Request = Request
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org