You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/11/17 00:02:18 UTC
[spark] branch master updated: [SPARK-41115][CONNECT] Add ClientType to proto to indicate which client sends a request
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8fb61d2bdf1 [SPARK-41115][CONNECT] Add ClientType to proto to indicate which client sends a request
8fb61d2bdf1 is described below
commit 8fb61d2bdf1f4ee6c04fb60c233addfaa55e1d1a
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Thu Nov 17 09:01:26 2022 +0900
[SPARK-41115][CONNECT] Add ClientType to proto to indicate which client sends a request
### What changes were proposed in this pull request?
This PRs introduces a `ClientType` into Connect proto that can be included into Request to indicate the client type (e.g. Python client, Scala client, etc.)
### Why are the changes needed?
Better understand requests by the server.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UT
Closes #38630 from amaliujia/client-type.
Authored-by: Rui Wang <ru...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../src/main/protobuf/spark/connect/base.proto | 5 +++
python/pyspark/sql/connect/client.py | 16 ++++----
python/pyspark/sql/connect/proto/base_pb2.py | 44 +++++++++++-----------
python/pyspark/sql/connect/proto/base_pb2.pyi | 32 +++++++++++++++-
4 files changed, 65 insertions(+), 32 deletions(-)
diff --git a/connector/connect/src/main/protobuf/spark/connect/base.proto b/connector/connect/src/main/protobuf/spark/connect/base.proto
index 5f59ada38b6..a521eab20d8 100644
--- a/connector/connect/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/base.proto
@@ -48,6 +48,11 @@ message Request {
// The logical plan to be executed / analyzed.
Plan plan = 3;
+ // Provides optional information about the client sending the request. This field
+ // can be used for language or version specific information and is only intended for
+ // logging purposes and will not be interpreted by the server.
+ optional string client_type = 4;
+
// User Context is used to refer to one particular user session that is executing
// queries in the backend.
message UserContext {
diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py
index 62949720134..3c3203a8f51 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -301,9 +301,7 @@ class RemoteSparkSession(object):
fun.parts.append(name)
fun.serialized_function = cloudpickle.dumps((function, return_type))
- req = pb2.Request()
- if self._user_id is not None:
- req.user_context.user_id = self._user_id
+ req = self._request_with_metadata()
req.plan.command.create_function.CopyFrom(fun)
self._execute_and_fetch(req)
@@ -357,9 +355,7 @@ class RemoteSparkSession(object):
)
def _to_pandas(self, plan: pb2.Plan) -> Optional[pandas.DataFrame]:
- req = pb2.Request()
- if self._user_id is not None:
- req.user_context.user_id = self._user_id
+ req = self._request_with_metadata()
req.plan.CopyFrom(plan)
return self._execute_and_fetch(req)
@@ -407,12 +403,16 @@ class RemoteSparkSession(object):
req.plan.command.CopyFrom(command)
self._execute_and_fetch(req)
- def _analyze(self, plan: pb2.Plan) -> AnalyzeResult:
+ def _request_with_metadata(self) -> pb2.Request:
req = pb2.Request()
+ req.client_type = "_SPARK_CONNECT_PYTHON"
if self._user_id:
req.user_context.user_id = self._user_id
- req.plan.CopyFrom(plan)
+ return req
+ def _analyze(self, plan: pb2.Plan) -> AnalyzeResult:
+ req = self._request_with_metadata()
+ req.plan.CopyFrom(plan)
resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
return AnalyzeResult.fromProto(resp)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py
index 1f577089d1a..0527e9b49aa 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -35,7 +35,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
- b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"\x92\x02\n\x07Request\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12\x45\n\x0cuser_contex [...]
+ b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"\xc8\x02\n\x07Request\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12\x45\n\x0cuser_contex [...]
)
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -49,25 +49,25 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_PLAN._serialized_start = 158
_PLAN._serialized_end = 274
_REQUEST._serialized_start = 277
- _REQUEST._serialized_end = 551
- _REQUEST_USERCONTEXT._serialized_start = 429
- _REQUEST_USERCONTEXT._serialized_end = 551
- _RESPONSE._serialized_start = 554
- _RESPONSE._serialized_end = 1418
- _RESPONSE_ARROWBATCH._serialized_start = 793
- _RESPONSE_ARROWBATCH._serialized_end = 854
- _RESPONSE_JSONBATCH._serialized_start = 856
- _RESPONSE_JSONBATCH._serialized_end = 916
- _RESPONSE_METRICS._serialized_start = 919
- _RESPONSE_METRICS._serialized_end = 1403
- _RESPONSE_METRICS_METRICOBJECT._serialized_start = 1003
- _RESPONSE_METRICS_METRICOBJECT._serialized_end = 1313
- _RESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 1201
- _RESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 1313
- _RESPONSE_METRICS_METRICVALUE._serialized_start = 1315
- _RESPONSE_METRICS_METRICVALUE._serialized_end = 1403
- _ANALYZERESPONSE._serialized_start = 1421
- _ANALYZERESPONSE._serialized_end = 1555
- _SPARKCONNECTSERVICE._serialized_start = 1558
- _SPARKCONNECTSERVICE._serialized_end = 1720
+ _REQUEST._serialized_end = 605
+ _REQUEST_USERCONTEXT._serialized_start = 467
+ _REQUEST_USERCONTEXT._serialized_end = 589
+ _RESPONSE._serialized_start = 608
+ _RESPONSE._serialized_end = 1472
+ _RESPONSE_ARROWBATCH._serialized_start = 847
+ _RESPONSE_ARROWBATCH._serialized_end = 908
+ _RESPONSE_JSONBATCH._serialized_start = 910
+ _RESPONSE_JSONBATCH._serialized_end = 970
+ _RESPONSE_METRICS._serialized_start = 973
+ _RESPONSE_METRICS._serialized_end = 1457
+ _RESPONSE_METRICS_METRICOBJECT._serialized_start = 1057
+ _RESPONSE_METRICS_METRICOBJECT._serialized_end = 1367
+ _RESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 1255
+ _RESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 1367
+ _RESPONSE_METRICS_METRICVALUE._serialized_start = 1369
+ _RESPONSE_METRICS_METRICVALUE._serialized_end = 1457
+ _ANALYZERESPONSE._serialized_start = 1475
+ _ANALYZERESPONSE._serialized_end = 1609
+ _SPARKCONNECTSERVICE._serialized_start = 1612
+ _SPARKCONNECTSERVICE._serialized_end = 1774
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi
index bf6d080d9fd..e70f9db14a3 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -135,6 +135,7 @@ class Request(google.protobuf.message.Message):
CLIENT_ID_FIELD_NUMBER: builtins.int
USER_CONTEXT_FIELD_NUMBER: builtins.int
PLAN_FIELD_NUMBER: builtins.int
+ CLIENT_TYPE_FIELD_NUMBER: builtins.int
client_id: builtins.str
"""The client_id is set by the client to be able to collate streaming responses from
different queries.
@@ -145,23 +146,50 @@ class Request(google.protobuf.message.Message):
@property
def plan(self) -> global___Plan:
"""The logical plan to be executed / analyzed."""
+ client_type: builtins.str
+ """Provides optional information about the client sending the request. This field
+ can be used for language or version specific information and is only intended for
+ logging purposes and will not be interpreted by the server.
+ """
def __init__(
self,
*,
client_id: builtins.str = ...,
user_context: global___Request.UserContext | None = ...,
plan: global___Plan | None = ...,
+ client_type: builtins.str | None = ...,
) -> None: ...
def HasField(
self,
- field_name: typing_extensions.Literal["plan", b"plan", "user_context", b"user_context"],
+ field_name: typing_extensions.Literal[
+ "_client_type",
+ b"_client_type",
+ "client_type",
+ b"client_type",
+ "plan",
+ b"plan",
+ "user_context",
+ b"user_context",
+ ],
) -> builtins.bool: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
- "client_id", b"client_id", "plan", b"plan", "user_context", b"user_context"
+ "_client_type",
+ b"_client_type",
+ "client_id",
+ b"client_id",
+ "client_type",
+ b"client_type",
+ "plan",
+ b"plan",
+ "user_context",
+ b"user_context",
],
) -> None: ...
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_client_type", b"_client_type"]
+ ) -> typing_extensions.Literal["client_type"] | None: ...
global___Request = Request
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org