You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/03/07 02:11:40 UTC
[spark] branch branch-3.4 updated: [SPARK-42688][CONNECT] Rename Connect proto Request client_id to session_id
This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 2bce5314da3 [SPARK-42688][CONNECT] Rename Connect proto Request client_id to session_id
2bce5314da3 is described below
commit 2bce5314da32c1fac7a6025449cb786c9b5c233b
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Mon Mar 6 22:11:18 2023 -0400
[SPARK-42688][CONNECT] Rename Connect proto Request client_id to session_id
### What changes were proposed in this pull request?
Rename Connect proto requests `client_id` to `session_id`.
On the one hand when I read `client_id` I was confused on what it is used to, even after reading the proto documentation.
On the other hand, client sides already use session_id:
https://github.com/apache/spark/blob/9bf174f9722e34f13bfaede5e59f989bf2a511e9/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala#L51
https://github.com/apache/spark/blob/9bf174f9722e34f13bfaede5e59f989bf2a511e9/python/pyspark/sql/connect/client.py#L522
### Why are the changes needed?
Code readability
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
Existing UT
Closes #40309 from amaliujia/update_client_id.
Authored-by: Rui Wang <ru...@databricks.com>
Signed-off-by: Herman van Hovell <he...@databricks.com>
(cherry picked from commit dfdc4a1d65e69ed88d43e17bd7325e9f8416c8e6)
Signed-off-by: Herman van Hovell <he...@databricks.com>
---
.../sql/connect/client/SparkConnectClient.scala | 6 +-
.../org/apache/spark/sql/ClientE2ETestSuite.scala | 4 +-
.../connect/client/SparkConnectClientSuite.scala | 14 +-
.../src/main/protobuf/spark/connect/base.proto | 41 +++--
.../sql/connect/planner/SparkConnectPlanner.scala | 10 +-
.../service/SparkConnectAnalyzeHandler.scala | 4 +-
.../service/SparkConnectConfigHandler.scala | 4 +-
.../service/SparkConnectStreamHandler.scala | 24 +--
.../connect/planner/SparkConnectServiceSuite.scala | 2 +-
python/pyspark/sql/connect/client.py | 22 +--
python/pyspark/sql/connect/proto/base_pb2.py | 196 ++++++++++-----------
python/pyspark/sql/connect/proto/base_pb2.pyi | 98 ++++++-----
python/pyspark/sql/tests/connect/test_client.py | 2 +-
13 files changed, 223 insertions(+), 204 deletions(-)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 05aa191a4dd..348fc94bb89 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -63,7 +63,7 @@ private[sql] class SparkConnectClient(
.newBuilder()
.setPlan(plan)
.setUserContext(userContext)
- .setClientId(sessionId)
+ .setSessionId(sessionId)
.setClientType(userAgent)
.build()
stub.executePlan(request)
@@ -78,7 +78,7 @@ private[sql] class SparkConnectClient(
val request = proto.ConfigRequest
.newBuilder()
.setOperation(operation)
- .setClientId(sessionId)
+ .setSessionId(sessionId)
.setClientType(userAgent)
.setUserContext(userContext)
.build()
@@ -157,7 +157,7 @@ private[sql] class SparkConnectClient(
private def analyze(builder: proto.AnalyzePlanRequest.Builder): proto.AnalyzePlanResponse = {
val request = builder
.setUserContext(userContext)
- .setClientId(sessionId)
+ .setSessionId(sessionId)
.setClientType(userAgent)
.build()
analyze(request)
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 11e28f538e8..94bc22ef77d 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -612,8 +612,8 @@ class ClientE2ETestSuite extends RemoteSparkSession {
}
test("SparkSession newSession") {
- val oldId = spark.sql("SELECT 1").analyze.getClientId
- val newId = spark.newSession().sql("SELECT 1").analyze.getClientId
+ val oldId = spark.sql("SELECT 1").analyze.getSessionId
+ val newId = spark.newSession().sql("SELECT 1").analyze.getSessionId
assert(oldId != newId)
}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index dcb13589206..bc600e5a071 100755
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -75,11 +75,11 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach {
client = clientBuilder(server.getPort)
val request = AnalyzePlanRequest
.newBuilder()
- .setClientId("abc123")
+ .setSessionId("abc123")
.build()
val response = client.analyze(request)
- assert(response.getClientId === "abc123")
+ assert(response.getSessionId === "abc123")
}
test("Test connection") {
@@ -99,7 +99,7 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach {
.connectionString(s"sc://localhost:${server.getPort}/;use_ssl=true")
.build()
- val request = AnalyzePlanRequest.newBuilder().setClientId("abc123").build()
+ val request = AnalyzePlanRequest.newBuilder().setSessionId("abc123").build()
// Failed the ssl handshake as the dummy server does not have any server credentials installed.
assertThrows[StatusRuntimeException] {
@@ -201,11 +201,11 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer
request: ExecutePlanRequest,
responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
// Reply with a dummy response using the same client ID
- val requestClientId = request.getClientId
+ val requestSessionId = request.getSessionId
inputPlan = request.getPlan
val response = ExecutePlanResponse
.newBuilder()
- .setClientId(requestClientId)
+ .setSessionId(requestSessionId)
.build()
responseObserver.onNext(response)
responseObserver.onCompleted()
@@ -215,7 +215,7 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer
request: AnalyzePlanRequest,
responseObserver: StreamObserver[AnalyzePlanResponse]): Unit = {
// Reply with a dummy response using the same client ID
- val requestClientId = request.getClientId
+ val requestSessionId = request.getSessionId
request.getAnalyzeCase match {
case proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA =>
inputPlan = request.getSchema.getPlan
@@ -233,7 +233,7 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer
}
val response = AnalyzePlanResponse
.newBuilder()
- .setClientId(requestClientId)
+ .setSessionId(requestSessionId)
.build()
responseObserver.onNext(response)
responseObserver.onCompleted()
diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index 2252d91c9ff..1a9c437f0ec 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -58,9 +58,10 @@ message UserContext {
message AnalyzePlanRequest {
// (Required)
//
- // The client_id is set by the client to be able to collate streaming responses from
- // different queries.
- string client_id = 1;
+ // The session_id specifies a spark session for a user id (which is specified
+ // by user_context.user_id). The session_id is set by the client to be able to
+ // collate streaming responses from different queries within the dedicated session.
+ string session_id = 1;
// (Required) User context
UserContext user_context = 2;
@@ -161,7 +162,7 @@ message AnalyzePlanRequest {
// Response to performing analysis of the query. Contains relevant metadata to be able to
// reason about the performance.
message AnalyzePlanResponse {
- string client_id = 1;
+ string session_id = 1;
oneof result {
Schema schema = 2;
@@ -217,11 +218,15 @@ message AnalyzePlanResponse {
message ExecutePlanRequest {
// (Required)
//
- // The client_id is set by the client to be able to collate streaming responses from
- // different queries.
- string client_id = 1;
+ // The session_id specifies a spark session for a user id (which is specified
+ // by user_context.user_id). The session_id is set by the client to be able to
+ // collate streaming responses from different queries within the dedicated session.
+ string session_id = 1;
// (Required) User context
+ //
+ // user_context.user_id and session+id both identify a unique remote spark session on the
+ // server side.
UserContext user_context = 2;
// (Required) The logical plan to be executed / analyzed.
@@ -234,9 +239,9 @@ message ExecutePlanRequest {
}
// The response of a query, can be one or more for each request. Responses belonging to the
-// same input query, carry the same `client_id`.
+// same input query, carry the same `session_id`.
message ExecutePlanResponse {
- string client_id = 1;
+ string session_id = 1;
// Union type for the different response messages.
oneof response_type {
@@ -304,9 +309,10 @@ message KeyValue {
message ConfigRequest {
// (Required)
//
- // The client_id is set by the client to be able to collate streaming responses from
- // different queries.
- string client_id = 1;
+ // The session_id specifies a spark session for a user id (which is specified
+ // by user_context.user_id). The session_id is set by the client to be able to
+ // collate streaming responses from different queries within the dedicated session.
+ string session_id = 1;
// (Required) User context
UserContext user_context = 2;
@@ -369,7 +375,7 @@ message ConfigRequest {
// Response to the config request.
message ConfigResponse {
- string client_id = 1;
+ string session_id = 1;
// (Optional) The result key-value pairs.
//
@@ -386,9 +392,12 @@ message ConfigResponse {
// Request to transfer client-local artifacts.
message AddArtifactsRequest {
- // The client_id is set by the client to be able to collate streaming responses from
- // different queries.
- string client_id = 1;
+ // (Required)
+ //
+ // The session_id specifies a spark session for a user id (which is specified
+ // by user_context.user_id). The session_id is set by the client to be able to
+ // collate streaming responses from different queries within the dedicated session.
+ string session_id = 1;
// User context
UserContext user_context = 2;
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 60fb94e8098..8ca004d520c 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -1459,7 +1459,7 @@ class SparkConnectPlanner(val session: SparkSession) {
def process(
command: proto.Command,
- clientId: String,
+ sessionId: String,
responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
command.getCommandTypeCase match {
case proto.Command.CommandTypeCase.REGISTER_FUNCTION =>
@@ -1473,14 +1473,14 @@ class SparkConnectPlanner(val session: SparkSession) {
case proto.Command.CommandTypeCase.EXTENSION =>
handleCommandPlugin(command.getExtension)
case proto.Command.CommandTypeCase.SQL_COMMAND =>
- handleSqlCommand(command.getSqlCommand, clientId, responseObserver)
+ handleSqlCommand(command.getSqlCommand, sessionId, responseObserver)
case _ => throw new UnsupportedOperationException(s"$command not supported.")
}
}
def handleSqlCommand(
getSqlCommand: SqlCommand,
- clientId: String,
+ sessionId: String,
responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
// Eagerly execute commands of the provided SQL string.
val df = session.sql(getSqlCommand.getSql, getSqlCommand.getArgsMap)
@@ -1537,12 +1537,12 @@ class SparkConnectPlanner(val session: SparkSession) {
responseObserver.onNext(
ExecutePlanResponse
.newBuilder()
- .setClientId(clientId)
+ .setSessionId(sessionId)
.setSqlCommandResult(result)
.build())
// Send Metrics
- SparkConnectStreamHandler.sendMetricsToResponse(clientId, df)
+ SparkConnectStreamHandler.sendMetricsToResponse(sessionId, df)
}
private def handleRegisterUserDefinedFunction(
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
index e3d4da66a08..9520ec8015f 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
@@ -35,7 +35,7 @@ private[connect] class SparkConnectAnalyzeHandler(
def handle(request: proto.AnalyzePlanRequest): Unit = {
val session =
SparkConnectService
- .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getClientId)
+ .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getSessionId)
.session
session.withActive {
val response = process(request, session)
@@ -155,7 +155,7 @@ private[connect] class SparkConnectAnalyzeHandler(
case other => throw InvalidPlanInput(s"Unknown Analyze Method $other!")
}
- builder.setClientId(request.getClientId)
+ builder.setSessionId(request.getSessionId)
builder.build()
}
}
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala
index 84f625222a8..38fd88297f3 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala
@@ -32,7 +32,7 @@ class SparkConnectConfigHandler(responseObserver: StreamObserver[proto.ConfigRes
def handle(request: proto.ConfigRequest): Unit = {
val session =
SparkConnectService
- .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getClientId)
+ .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getSessionId)
.session
val builder = request.getOperation.getOpTypeCase match {
@@ -53,7 +53,7 @@ class SparkConnectConfigHandler(responseObserver: StreamObserver[proto.ConfigRes
case _ => throw new UnsupportedOperationException(s"${request.getOperation} not supported.")
}
- builder.setClientId(request.getClientId)
+ builder.setSessionId(request.getSessionId)
responseObserver.onNext(builder.build())
responseObserver.onCompleted()
}
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index 15d5a981ae8..0dd1741f099 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -44,7 +44,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
def handle(v: ExecutePlanRequest): Unit = {
val session =
SparkConnectService
- .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getClientId)
+ .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
.session
session.withActive {
v.getPlan.getOpTypeCase match {
@@ -60,12 +60,12 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
// Extract the plan from the request and convert it to a logical plan
val planner = new SparkConnectPlanner(session)
val dataframe = Dataset.ofRows(session, planner.transformRelation(request.getPlan.getRoot))
- processAsArrowBatches(request.getClientId, dataframe, responseObserver)
+ processAsArrowBatches(request.getSessionId, dataframe, responseObserver)
responseObserver.onNext(
- SparkConnectStreamHandler.sendMetricsToResponse(request.getClientId, dataframe))
+ SparkConnectStreamHandler.sendMetricsToResponse(request.getSessionId, dataframe))
if (dataframe.queryExecution.observedMetrics.nonEmpty) {
responseObserver.onNext(
- SparkConnectStreamHandler.sendObservedMetricsToResponse(request.getClientId, dataframe))
+ SparkConnectStreamHandler.sendObservedMetricsToResponse(request.getSessionId, dataframe))
}
responseObserver.onCompleted()
}
@@ -73,7 +73,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
private def handleCommand(session: SparkSession, request: ExecutePlanRequest): Unit = {
val command = request.getPlan.getCommand
val planner = new SparkConnectPlanner(session)
- planner.process(command, request.getClientId, responseObserver)
+ planner.process(command, request.getSessionId, responseObserver)
responseObserver.onCompleted()
}
}
@@ -96,7 +96,7 @@ object SparkConnectStreamHandler {
}
def processAsArrowBatches(
- clientId: String,
+ sessionId: String,
dataframe: DataFrame,
responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
val spark = dataframe.sparkSession
@@ -173,7 +173,7 @@ object SparkConnectStreamHandler {
}
partition.foreach { case (bytes, count) =>
- val response = proto.ExecutePlanResponse.newBuilder().setClientId(clientId)
+ val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId)
val batch = proto.ExecutePlanResponse.ArrowBatch
.newBuilder()
.setRowCount(count)
@@ -191,7 +191,7 @@ object SparkConnectStreamHandler {
// Make sure at least 1 batch will be sent.
if (numSent == 0) {
val bytes = ArrowConverters.createEmptyArrowBatch(schema, timeZoneId)
- val response = proto.ExecutePlanResponse.newBuilder().setClientId(clientId)
+ val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId)
val batch = proto.ExecutePlanResponse.ArrowBatch
.newBuilder()
.setRowCount(0L)
@@ -203,17 +203,17 @@ object SparkConnectStreamHandler {
}
}
- def sendMetricsToResponse(clientId: String, rows: DataFrame): ExecutePlanResponse = {
+ def sendMetricsToResponse(sessionId: String, rows: DataFrame): ExecutePlanResponse = {
// Send a last batch with the metrics
ExecutePlanResponse
.newBuilder()
- .setClientId(clientId)
+ .setSessionId(sessionId)
.setMetrics(MetricGenerator.buildMetrics(rows.queryExecution.executedPlan))
.build()
}
def sendObservedMetricsToResponse(
- clientId: String,
+ sessionId: String,
dataframe: DataFrame): ExecutePlanResponse = {
val observedMetrics = dataframe.queryExecution.observedMetrics.map { case (name, row) =>
val cols = (0 until row.length).map(i => toConnectProtoValue(row(i)))
@@ -226,7 +226,7 @@ object SparkConnectStreamHandler {
// Prepare a response with the observed metrics.
ExecutePlanResponse
.newBuilder()
- .setClientId(clientId)
+ .setSessionId(sessionId)
.addAllObservedMetrics(observedMetrics.asJava)
.build()
}
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
index 2885d0035bc..e2aecaaea86 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
@@ -221,7 +221,7 @@ class SparkConnectServiceSuite extends SharedSparkSession {
.newBuilder()
.setPlan(plan)
.setUserContext(context)
- .setClientId("session")
+ .setSessionId("session")
.build()
// The observer is executed inside this thread. So
diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py
index 2594640aa3e..8c85f17bb5f 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -712,7 +712,7 @@ class SparkConnectClient(object):
def _execute_plan_request_with_metadata(self) -> pb2.ExecutePlanRequest:
req = pb2.ExecutePlanRequest()
- req.client_id = self._session_id
+ req.session_id = self._session_id
req.client_type = self._builder.userAgent
if self._user_id:
req.user_context.user_id = self._user_id
@@ -720,7 +720,7 @@ class SparkConnectClient(object):
def _analyze_plan_request_with_metadata(self) -> pb2.AnalyzePlanRequest:
req = pb2.AnalyzePlanRequest()
- req.client_id = self._session_id
+ req.session_id = self._session_id
req.client_type = self._builder.userAgent
if self._user_id:
req.user_context.user_id = self._user_id
@@ -791,10 +791,10 @@ class SparkConnectClient(object):
):
with attempt:
resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
- if resp.client_id != self._session_id:
+ if resp.session_id != self._session_id:
raise SparkConnectException(
"Received incorrect session identifier for request:"
- f"{resp.client_id} != {self._session_id}"
+ f"{resp.session_id} != {self._session_id}"
)
return AnalyzeResult.fromProto(resp)
raise SparkConnectException("Invalid state during retry exception handling.")
@@ -818,10 +818,10 @@ class SparkConnectClient(object):
):
with attempt:
for b in self._stub.ExecutePlan(req, metadata=self._builder.metadata()):
- if b.client_id != self._session_id:
+ if b.session_id != self._session_id:
raise SparkConnectException(
"Received incorrect session identifier for request: "
- f"{b.client_id} != {self._session_id}"
+ f"{b.session_id} != {self._session_id}"
)
except grpc.RpcError as rpc_error:
self._handle_error(rpc_error)
@@ -842,10 +842,10 @@ class SparkConnectClient(object):
with attempt:
batches = []
for b in self._stub.ExecutePlan(req, metadata=self._builder.metadata()):
- if b.client_id != self._session_id:
+ if b.session_id != self._session_id:
raise SparkConnectException(
"Received incorrect session identifier for request: "
- f"{b.client_id} != {self._session_id}"
+ f"{b.session_id} != {self._session_id}"
)
if b.metrics is not None:
logger.debug("Received metric batch.")
@@ -878,7 +878,7 @@ class SparkConnectClient(object):
def _config_request_with_metadata(self) -> pb2.ConfigRequest:
req = pb2.ConfigRequest()
- req.client_id = self._session_id
+ req.session_id = self._session_id
req.client_type = self._builder.userAgent
if self._user_id:
req.user_context.user_id = self._user_id
@@ -905,10 +905,10 @@ class SparkConnectClient(object):
):
with attempt:
resp = self._stub.Config(req, metadata=self._builder.metadata())
- if resp.client_id != self._session_id:
+ if resp.session_id != self._session_id:
raise SparkConnectException(
"Received incorrect session identifier for request:"
- f"{resp.client_id} != {self._session_id}"
+ f"{resp.session_id} != {self._session_id}"
)
return ConfigResult.fromProto(resp)
raise SparkConnectException("Invalid state during retry exception handling.")
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py
index 6d41ce28c7c..c67e58b44cd 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
- b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...]
+ b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...]
)
@@ -620,101 +620,101 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_USERCONTEXT._serialized_start = 309
_USERCONTEXT._serialized_end = 431
_ANALYZEPLANREQUEST._serialized_start = 434
- _ANALYZEPLANREQUEST._serialized_end = 2089
- _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1295
- _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1344
- _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1347
- _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 1662
- _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1490
- _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 1662
- _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 1664
- _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 1717
- _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 1719
- _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 1769
- _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 1771
- _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 1825
- _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 1827
- _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 1880
- _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 1882
- _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 1896
- _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 1898
- _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 1939
- _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 1941
- _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2062
- _ANALYZEPLANRESPONSE._serialized_start = 2092
- _ANALYZEPLANRESPONSE._serialized_end = 3294
- _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 2862
- _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 2919
- _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 2921
- _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 2969
- _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 2971
- _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 3016
- _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 3018
- _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 3054
- _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 3056
- _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 3104
- _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 3106
- _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 3140
- _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 3142
- _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 3182
- _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 3184
- _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 3243
- _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 3245
- _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 3284
- _EXECUTEPLANREQUEST._serialized_start = 3297
- _EXECUTEPLANREQUEST._serialized_end = 3504
- _EXECUTEPLANRESPONSE._serialized_start = 3507
- _EXECUTEPLANRESPONSE._serialized_end = 4731
- _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 3962
- _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 4033
- _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 4035
- _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 4096
- _EXECUTEPLANRESPONSE_METRICS._serialized_start = 4099
- _EXECUTEPLANRESPONSE_METRICS._serialized_end = 4616
- _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 4194
- _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 4526
- _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 4403
- _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 4526
- _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 4528
- _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 4616
- _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 4618
- _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 4714
- _KEYVALUE._serialized_start = 4733
- _KEYVALUE._serialized_end = 4798
- _CONFIGREQUEST._serialized_start = 4801
- _CONFIGREQUEST._serialized_end = 5827
- _CONFIGREQUEST_OPERATION._serialized_start = 5019
- _CONFIGREQUEST_OPERATION._serialized_end = 5517
- _CONFIGREQUEST_SET._serialized_start = 5519
- _CONFIGREQUEST_SET._serialized_end = 5571
- _CONFIGREQUEST_GET._serialized_start = 5573
- _CONFIGREQUEST_GET._serialized_end = 5598
- _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 5600
- _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 5663
- _CONFIGREQUEST_GETOPTION._serialized_start = 5665
- _CONFIGREQUEST_GETOPTION._serialized_end = 5696
- _CONFIGREQUEST_GETALL._serialized_start = 5698
- _CONFIGREQUEST_GETALL._serialized_end = 5746
- _CONFIGREQUEST_UNSET._serialized_start = 5748
- _CONFIGREQUEST_UNSET._serialized_end = 5775
- _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 5777
- _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 5811
- _CONFIGRESPONSE._serialized_start = 5829
- _CONFIGRESPONSE._serialized_end = 5949
- _ADDARTIFACTSREQUEST._serialized_start = 5952
- _ADDARTIFACTSREQUEST._serialized_end = 6767
- _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 6299
- _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 6352
- _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 6354
- _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 6465
- _ADDARTIFACTSREQUEST_BATCH._serialized_start = 6467
- _ADDARTIFACTSREQUEST_BATCH._serialized_end = 6560
- _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 6563
- _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 6756
- _ADDARTIFACTSRESPONSE._serialized_start = 6770
- _ADDARTIFACTSRESPONSE._serialized_end = 6958
- _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 6877
- _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 6958
- _SPARKCONNECTSERVICE._serialized_start = 6961
- _SPARKCONNECTSERVICE._serialized_end = 7326
+ _ANALYZEPLANREQUEST._serialized_end = 2091
+ _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1297
+ _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1346
+ _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1349
+ _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 1664
+ _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1492
+ _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 1664
+ _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 1666
+ _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 1719
+ _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 1721
+ _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 1771
+ _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 1773
+ _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 1827
+ _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 1829
+ _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 1882
+ _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 1884
+ _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 1898
+ _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 1900
+ _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 1941
+ _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 1943
+ _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2064
+ _ANALYZEPLANRESPONSE._serialized_start = 2094
+ _ANALYZEPLANRESPONSE._serialized_end = 3298
+ _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 2866
+ _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 2923
+ _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 2925
+ _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 2973
+ _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 2975
+ _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 3020
+ _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 3022
+ _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 3058
+ _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 3060
+ _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 3108
+ _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 3110
+ _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 3144
+ _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 3146
+ _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 3186
+ _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 3188
+ _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 3247
+ _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 3249
+ _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 3288
+ _EXECUTEPLANREQUEST._serialized_start = 3301
+ _EXECUTEPLANREQUEST._serialized_end = 3510
+ _EXECUTEPLANRESPONSE._serialized_start = 3513
+ _EXECUTEPLANRESPONSE._serialized_end = 4739
+ _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 3970
+ _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 4041
+ _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 4043
+ _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 4104
+ _EXECUTEPLANRESPONSE_METRICS._serialized_start = 4107
+ _EXECUTEPLANRESPONSE_METRICS._serialized_end = 4624
+ _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 4202
+ _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 4534
+ _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 4411
+ _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 4534
+ _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 4536
+ _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 4624
+ _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 4626
+ _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 4722
+ _KEYVALUE._serialized_start = 4741
+ _KEYVALUE._serialized_end = 4806
+ _CONFIGREQUEST._serialized_start = 4809
+ _CONFIGREQUEST._serialized_end = 5837
+ _CONFIGREQUEST_OPERATION._serialized_start = 5029
+ _CONFIGREQUEST_OPERATION._serialized_end = 5527
+ _CONFIGREQUEST_SET._serialized_start = 5529
+ _CONFIGREQUEST_SET._serialized_end = 5581
+ _CONFIGREQUEST_GET._serialized_start = 5583
+ _CONFIGREQUEST_GET._serialized_end = 5608
+ _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 5610
+ _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 5673
+ _CONFIGREQUEST_GETOPTION._serialized_start = 5675
+ _CONFIGREQUEST_GETOPTION._serialized_end = 5706
+ _CONFIGREQUEST_GETALL._serialized_start = 5708
+ _CONFIGREQUEST_GETALL._serialized_end = 5756
+ _CONFIGREQUEST_UNSET._serialized_start = 5758
+ _CONFIGREQUEST_UNSET._serialized_end = 5785
+ _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 5787
+ _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 5821
+ _CONFIGRESPONSE._serialized_start = 5839
+ _CONFIGRESPONSE._serialized_end = 5961
+ _ADDARTIFACTSREQUEST._serialized_start = 5964
+ _ADDARTIFACTSREQUEST._serialized_end = 6781
+ _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 6313
+ _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 6366
+ _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 6368
+ _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 6479
+ _ADDARTIFACTSREQUEST_BATCH._serialized_start = 6481
+ _ADDARTIFACTSREQUEST_BATCH._serialized_end = 6574
+ _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 6577
+ _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 6770
+ _ADDARTIFACTSRESPONSE._serialized_start = 6784
+ _ADDARTIFACTSRESPONSE._serialized_end = 6972
+ _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 6891
+ _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 6972
+ _SPARKCONNECTSERVICE._serialized_start = 6975
+ _SPARKCONNECTSERVICE._serialized_end = 7340
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi
index 2e9a877b658..e87194f31aa 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -350,7 +350,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
],
) -> None: ...
- CLIENT_ID_FIELD_NUMBER: builtins.int
+ SESSION_ID_FIELD_NUMBER: builtins.int
USER_CONTEXT_FIELD_NUMBER: builtins.int
CLIENT_TYPE_FIELD_NUMBER: builtins.int
SCHEMA_FIELD_NUMBER: builtins.int
@@ -362,11 +362,12 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
SPARK_VERSION_FIELD_NUMBER: builtins.int
DDL_PARSE_FIELD_NUMBER: builtins.int
SAME_SEMANTICS_FIELD_NUMBER: builtins.int
- client_id: builtins.str
+ session_id: builtins.str
"""(Required)
- The client_id is set by the client to be able to collate streaming responses from
- different queries.
+ The session_id specifies a spark session for a user id (which is specified
+ by user_context.user_id). The session_id is set by the client to be able to
+ collate streaming responses from different queries within the dedicated session.
"""
@property
def user_context(self) -> global___UserContext:
@@ -397,7 +398,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
def __init__(
self,
*,
- client_id: builtins.str = ...,
+ session_id: builtins.str = ...,
user_context: global___UserContext | None = ...,
client_type: builtins.str | None = ...,
schema: global___AnalyzePlanRequest.Schema | None = ...,
@@ -448,8 +449,6 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
b"_client_type",
"analyze",
b"analyze",
- "client_id",
- b"client_id",
"client_type",
b"client_type",
"ddl_parse",
@@ -466,6 +465,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
b"same_semantics",
"schema",
b"schema",
+ "session_id",
+ b"session_id",
"spark_version",
b"spark_version",
"tree_string",
@@ -638,7 +639,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
self, field_name: typing_extensions.Literal["result", b"result"]
) -> None: ...
- CLIENT_ID_FIELD_NUMBER: builtins.int
+ SESSION_ID_FIELD_NUMBER: builtins.int
SCHEMA_FIELD_NUMBER: builtins.int
EXPLAIN_FIELD_NUMBER: builtins.int
TREE_STRING_FIELD_NUMBER: builtins.int
@@ -648,7 +649,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
SPARK_VERSION_FIELD_NUMBER: builtins.int
DDL_PARSE_FIELD_NUMBER: builtins.int
SAME_SEMANTICS_FIELD_NUMBER: builtins.int
- client_id: builtins.str
+ session_id: builtins.str
@property
def schema(self) -> global___AnalyzePlanResponse.Schema: ...
@property
@@ -670,7 +671,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
def __init__(
self,
*,
- client_id: builtins.str = ...,
+ session_id: builtins.str = ...,
schema: global___AnalyzePlanResponse.Schema | None = ...,
explain: global___AnalyzePlanResponse.Explain | None = ...,
tree_string: global___AnalyzePlanResponse.TreeString | None = ...,
@@ -709,8 +710,6 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
def ClearField(
self,
field_name: typing_extensions.Literal[
- "client_id",
- b"client_id",
"ddl_parse",
b"ddl_parse",
"explain",
@@ -727,6 +726,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
b"same_semantics",
"schema",
b"schema",
+ "session_id",
+ b"session_id",
"spark_version",
b"spark_version",
"tree_string",
@@ -754,19 +755,24 @@ class ExecutePlanRequest(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
- CLIENT_ID_FIELD_NUMBER: builtins.int
+ SESSION_ID_FIELD_NUMBER: builtins.int
USER_CONTEXT_FIELD_NUMBER: builtins.int
PLAN_FIELD_NUMBER: builtins.int
CLIENT_TYPE_FIELD_NUMBER: builtins.int
- client_id: builtins.str
+ session_id: builtins.str
"""(Required)
- The client_id is set by the client to be able to collate streaming responses from
- different queries.
+ The session_id specifies a spark session for a user id (which is specified
+ by user_context.user_id). The session_id is set by the client to be able to
+ collate streaming responses from different queries within the dedicated session.
"""
@property
def user_context(self) -> global___UserContext:
- """(Required) User context"""
+ """(Required) User context
+
+ user_context.user_id and session+id both identify a unique remote spark session on the
+ server side.
+ """
@property
def plan(self) -> global___Plan:
"""(Required) The logical plan to be executed / analyzed."""
@@ -778,7 +784,7 @@ class ExecutePlanRequest(google.protobuf.message.Message):
def __init__(
self,
*,
- client_id: builtins.str = ...,
+ session_id: builtins.str = ...,
user_context: global___UserContext | None = ...,
plan: global___Plan | None = ...,
client_type: builtins.str | None = ...,
@@ -801,12 +807,12 @@ class ExecutePlanRequest(google.protobuf.message.Message):
field_name: typing_extensions.Literal[
"_client_type",
b"_client_type",
- "client_id",
- b"client_id",
"client_type",
b"client_type",
"plan",
b"plan",
+ "session_id",
+ b"session_id",
"user_context",
b"user_context",
],
@@ -819,7 +825,7 @@ global___ExecutePlanRequest = ExecutePlanRequest
class ExecutePlanResponse(google.protobuf.message.Message):
"""The response of a query, can be one or more for each request. Responses belonging to the
- same input query, carry the same `client_id`.
+ same input query, carry the same `session_id`.
"""
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -995,13 +1001,13 @@ class ExecutePlanResponse(google.protobuf.message.Message):
self, field_name: typing_extensions.Literal["name", b"name", "values", b"values"]
) -> None: ...
- CLIENT_ID_FIELD_NUMBER: builtins.int
+ SESSION_ID_FIELD_NUMBER: builtins.int
ARROW_BATCH_FIELD_NUMBER: builtins.int
SQL_COMMAND_RESULT_FIELD_NUMBER: builtins.int
EXTENSION_FIELD_NUMBER: builtins.int
METRICS_FIELD_NUMBER: builtins.int
OBSERVED_METRICS_FIELD_NUMBER: builtins.int
- client_id: builtins.str
+ session_id: builtins.str
@property
def arrow_batch(self) -> global___ExecutePlanResponse.ArrowBatch: ...
@property
@@ -1025,7 +1031,7 @@ class ExecutePlanResponse(google.protobuf.message.Message):
def __init__(
self,
*,
- client_id: builtins.str = ...,
+ session_id: builtins.str = ...,
arrow_batch: global___ExecutePlanResponse.ArrowBatch | None = ...,
sql_command_result: global___ExecutePlanResponse.SqlCommandResult | None = ...,
extension: google.protobuf.any_pb2.Any | None = ...,
@@ -1053,8 +1059,6 @@ class ExecutePlanResponse(google.protobuf.message.Message):
field_name: typing_extensions.Literal[
"arrow_batch",
b"arrow_batch",
- "client_id",
- b"client_id",
"extension",
b"extension",
"metrics",
@@ -1063,6 +1067,8 @@ class ExecutePlanResponse(google.protobuf.message.Message):
b"observed_metrics",
"response_type",
b"response_type",
+ "session_id",
+ b"session_id",
"sql_command_result",
b"sql_command_result",
],
@@ -1310,15 +1316,16 @@ class ConfigRequest(google.protobuf.message.Message):
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["keys", b"keys"]) -> None: ...
- CLIENT_ID_FIELD_NUMBER: builtins.int
+ SESSION_ID_FIELD_NUMBER: builtins.int
USER_CONTEXT_FIELD_NUMBER: builtins.int
OPERATION_FIELD_NUMBER: builtins.int
CLIENT_TYPE_FIELD_NUMBER: builtins.int
- client_id: builtins.str
+ session_id: builtins.str
"""(Required)
- The client_id is set by the client to be able to collate streaming responses from
- different queries.
+ The session_id specifies a spark session for a user id (which is specified
+ by user_context.user_id). The session_id is set by the client to be able to
+ collate streaming responses from different queries within the dedicated session.
"""
@property
def user_context(self) -> global___UserContext:
@@ -1334,7 +1341,7 @@ class ConfigRequest(google.protobuf.message.Message):
def __init__(
self,
*,
- client_id: builtins.str = ...,
+ session_id: builtins.str = ...,
user_context: global___UserContext | None = ...,
operation: global___ConfigRequest.Operation | None = ...,
client_type: builtins.str | None = ...,
@@ -1357,12 +1364,12 @@ class ConfigRequest(google.protobuf.message.Message):
field_name: typing_extensions.Literal[
"_client_type",
b"_client_type",
- "client_id",
- b"client_id",
"client_type",
b"client_type",
"operation",
b"operation",
+ "session_id",
+ b"session_id",
"user_context",
b"user_context",
],
@@ -1378,10 +1385,10 @@ class ConfigResponse(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
- CLIENT_ID_FIELD_NUMBER: builtins.int
+ SESSION_ID_FIELD_NUMBER: builtins.int
PAIRS_FIELD_NUMBER: builtins.int
WARNINGS_FIELD_NUMBER: builtins.int
- client_id: builtins.str
+ session_id: builtins.str
@property
def pairs(
self,
@@ -1402,14 +1409,14 @@ class ConfigResponse(google.protobuf.message.Message):
def __init__(
self,
*,
- client_id: builtins.str = ...,
+ session_id: builtins.str = ...,
pairs: collections.abc.Iterable[global___KeyValue] | None = ...,
warnings: collections.abc.Iterable[builtins.str] | None = ...,
) -> None: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
- "client_id", b"client_id", "pairs", b"pairs", "warnings", b"warnings"
+ "pairs", b"pairs", "session_id", b"session_id", "warnings", b"warnings"
],
) -> None: ...
@@ -1546,14 +1553,17 @@ class AddArtifactsRequest(google.protobuf.message.Message):
],
) -> None: ...
- CLIENT_ID_FIELD_NUMBER: builtins.int
+ SESSION_ID_FIELD_NUMBER: builtins.int
USER_CONTEXT_FIELD_NUMBER: builtins.int
BATCH_FIELD_NUMBER: builtins.int
BEGIN_CHUNK_FIELD_NUMBER: builtins.int
CHUNK_FIELD_NUMBER: builtins.int
- client_id: builtins.str
- """The client_id is set by the client to be able to collate streaming responses from
- different queries.
+ session_id: builtins.str
+ """(Required)
+
+ The session_id specifies a spark session for a user id (which is specified
+ by user_context.user_id). The session_id is set by the client to be able to
+ collate streaming responses from different queries within the dedicated session.
"""
@property
def user_context(self) -> global___UserContext:
@@ -1574,7 +1584,7 @@ class AddArtifactsRequest(google.protobuf.message.Message):
def __init__(
self,
*,
- client_id: builtins.str = ...,
+ session_id: builtins.str = ...,
user_context: global___UserContext | None = ...,
batch: global___AddArtifactsRequest.Batch | None = ...,
begin_chunk: global___AddArtifactsRequest.BeginChunkedArtifact | None = ...,
@@ -1604,10 +1614,10 @@ class AddArtifactsRequest(google.protobuf.message.Message):
b"begin_chunk",
"chunk",
b"chunk",
- "client_id",
- b"client_id",
"payload",
b"payload",
+ "session_id",
+ b"session_id",
"user_context",
b"user_context",
],
diff --git a/python/pyspark/sql/tests/connect/test_client.py b/python/pyspark/sql/tests/connect/test_client.py
index 84281a6764f..6131e146363 100644
--- a/python/pyspark/sql/tests/connect/test_client.py
+++ b/python/pyspark/sql/tests/connect/test_client.py
@@ -64,7 +64,7 @@ class MockService:
def ExecutePlan(self, req: proto.ExecutePlanRequest, metadata):
self.req = req
resp = proto.ExecutePlanResponse()
- resp.client_id = self._session_id
+ resp.session_id = self._session_id
pdf = pd.DataFrame(data={"col1": [1, 2]})
schema = pa.Schema.from_pandas(pdf)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org