You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/03/07 02:11:40 UTC

[spark] branch branch-3.4 updated: [SPARK-42688][CONNECT] Rename Connect proto Request client_id to session_id

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 2bce5314da3 [SPARK-42688][CONNECT] Rename Connect proto Request client_id to session_id
2bce5314da3 is described below

commit 2bce5314da32c1fac7a6025449cb786c9b5c233b
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Mon Mar 6 22:11:18 2023 -0400

    [SPARK-42688][CONNECT] Rename Connect proto Request client_id to session_id
    
    ### What changes were proposed in this pull request?
    
    Rename Connect proto requests `client_id` to `session_id`.
    
    On the one hand when I read `client_id` I was confused on what it is used to, even after reading the proto documentation.
    
    On the other hand,  client sides already use session_id:
    https://github.com/apache/spark/blob/9bf174f9722e34f13bfaede5e59f989bf2a511e9/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala#L51
    https://github.com/apache/spark/blob/9bf174f9722e34f13bfaede5e59f989bf2a511e9/python/pyspark/sql/connect/client.py#L522
    
    ### Why are the changes needed?
    
    Code readability
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    Existing UT
    
    Closes #40309 from amaliujia/update_client_id.
    
    Authored-by: Rui Wang <ru...@databricks.com>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
    (cherry picked from commit dfdc4a1d65e69ed88d43e17bd7325e9f8416c8e6)
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 .../sql/connect/client/SparkConnectClient.scala    |   6 +-
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  |   4 +-
 .../connect/client/SparkConnectClientSuite.scala   |  14 +-
 .../src/main/protobuf/spark/connect/base.proto     |  41 +++--
 .../sql/connect/planner/SparkConnectPlanner.scala  |  10 +-
 .../service/SparkConnectAnalyzeHandler.scala       |   4 +-
 .../service/SparkConnectConfigHandler.scala        |   4 +-
 .../service/SparkConnectStreamHandler.scala        |  24 +--
 .../connect/planner/SparkConnectServiceSuite.scala |   2 +-
 python/pyspark/sql/connect/client.py               |  22 +--
 python/pyspark/sql/connect/proto/base_pb2.py       | 196 ++++++++++-----------
 python/pyspark/sql/connect/proto/base_pb2.pyi      |  98 ++++++-----
 python/pyspark/sql/tests/connect/test_client.py    |   2 +-
 13 files changed, 223 insertions(+), 204 deletions(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 05aa191a4dd..348fc94bb89 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -63,7 +63,7 @@ private[sql] class SparkConnectClient(
       .newBuilder()
       .setPlan(plan)
       .setUserContext(userContext)
-      .setClientId(sessionId)
+      .setSessionId(sessionId)
       .setClientType(userAgent)
       .build()
     stub.executePlan(request)
@@ -78,7 +78,7 @@ private[sql] class SparkConnectClient(
     val request = proto.ConfigRequest
       .newBuilder()
       .setOperation(operation)
-      .setClientId(sessionId)
+      .setSessionId(sessionId)
       .setClientType(userAgent)
       .setUserContext(userContext)
       .build()
@@ -157,7 +157,7 @@ private[sql] class SparkConnectClient(
   private def analyze(builder: proto.AnalyzePlanRequest.Builder): proto.AnalyzePlanResponse = {
     val request = builder
       .setUserContext(userContext)
-      .setClientId(sessionId)
+      .setSessionId(sessionId)
       .setClientType(userAgent)
       .build()
     analyze(request)
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 11e28f538e8..94bc22ef77d 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -612,8 +612,8 @@ class ClientE2ETestSuite extends RemoteSparkSession {
   }
 
   test("SparkSession newSession") {
-    val oldId = spark.sql("SELECT 1").analyze.getClientId
-    val newId = spark.newSession().sql("SELECT 1").analyze.getClientId
+    val oldId = spark.sql("SELECT 1").analyze.getSessionId
+    val newId = spark.newSession().sql("SELECT 1").analyze.getSessionId
     assert(oldId != newId)
   }
 
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index dcb13589206..bc600e5a071 100755
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -75,11 +75,11 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach {
     client = clientBuilder(server.getPort)
     val request = AnalyzePlanRequest
       .newBuilder()
-      .setClientId("abc123")
+      .setSessionId("abc123")
       .build()
 
     val response = client.analyze(request)
-    assert(response.getClientId === "abc123")
+    assert(response.getSessionId === "abc123")
   }
 
   test("Test connection") {
@@ -99,7 +99,7 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach {
       .connectionString(s"sc://localhost:${server.getPort}/;use_ssl=true")
       .build()
 
-    val request = AnalyzePlanRequest.newBuilder().setClientId("abc123").build()
+    val request = AnalyzePlanRequest.newBuilder().setSessionId("abc123").build()
 
     // Failed the ssl handshake as the dummy server does not have any server credentials installed.
     assertThrows[StatusRuntimeException] {
@@ -201,11 +201,11 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer
       request: ExecutePlanRequest,
       responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
     // Reply with a dummy response using the same client ID
-    val requestClientId = request.getClientId
+    val requestSessionId = request.getSessionId
     inputPlan = request.getPlan
     val response = ExecutePlanResponse
       .newBuilder()
-      .setClientId(requestClientId)
+      .setSessionId(requestSessionId)
       .build()
     responseObserver.onNext(response)
     responseObserver.onCompleted()
@@ -215,7 +215,7 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer
       request: AnalyzePlanRequest,
       responseObserver: StreamObserver[AnalyzePlanResponse]): Unit = {
     // Reply with a dummy response using the same client ID
-    val requestClientId = request.getClientId
+    val requestSessionId = request.getSessionId
     request.getAnalyzeCase match {
       case proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA =>
         inputPlan = request.getSchema.getPlan
@@ -233,7 +233,7 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer
     }
     val response = AnalyzePlanResponse
       .newBuilder()
-      .setClientId(requestClientId)
+      .setSessionId(requestSessionId)
       .build()
     responseObserver.onNext(response)
     responseObserver.onCompleted()
diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index 2252d91c9ff..1a9c437f0ec 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -58,9 +58,10 @@ message UserContext {
 message AnalyzePlanRequest {
   // (Required)
   //
-  // The client_id is set by the client to be able to collate streaming responses from
-  // different queries.
-  string client_id = 1;
+  // The session_id specifies a spark session for a user id (which is specified
+  // by user_context.user_id). The session_id is set by the client to be able to
+  // collate streaming responses from different queries within the dedicated session.
+  string session_id = 1;
 
   // (Required) User context
   UserContext user_context = 2;
@@ -161,7 +162,7 @@ message AnalyzePlanRequest {
 // Response to performing analysis of the query. Contains relevant metadata to be able to
 // reason about the performance.
 message AnalyzePlanResponse {
-  string client_id = 1;
+  string session_id = 1;
 
   oneof result {
     Schema schema = 2;
@@ -217,11 +218,15 @@ message AnalyzePlanResponse {
 message ExecutePlanRequest {
   // (Required)
   //
-  // The client_id is set by the client to be able to collate streaming responses from
-  // different queries.
-  string client_id = 1;
+  // The session_id specifies a spark session for a user id (which is specified
+  // by user_context.user_id). The session_id is set by the client to be able to
+  // collate streaming responses from different queries within the dedicated session.
+  string session_id = 1;
 
   // (Required) User context
+  //
+  // user_context.user_id and session+id both identify a unique remote spark session on the
+  // server side.
   UserContext user_context = 2;
 
   // (Required) The logical plan to be executed / analyzed.
@@ -234,9 +239,9 @@ message ExecutePlanRequest {
 }
 
 // The response of a query, can be one or more for each request. Responses belonging to the
-// same input query, carry the same `client_id`.
+// same input query, carry the same `session_id`.
 message ExecutePlanResponse {
-  string client_id = 1;
+  string session_id = 1;
 
   // Union type for the different response messages.
   oneof response_type {
@@ -304,9 +309,10 @@ message KeyValue {
 message ConfigRequest {
   // (Required)
   //
-  // The client_id is set by the client to be able to collate streaming responses from
-  // different queries.
-  string client_id = 1;
+  // The session_id specifies a spark session for a user id (which is specified
+  // by user_context.user_id). The session_id is set by the client to be able to
+  // collate streaming responses from different queries within the dedicated session.
+  string session_id = 1;
 
   // (Required) User context
   UserContext user_context = 2;
@@ -369,7 +375,7 @@ message ConfigRequest {
 
 // Response to the config request.
 message ConfigResponse {
-  string client_id = 1;
+  string session_id = 1;
 
   // (Optional) The result key-value pairs.
   //
@@ -386,9 +392,12 @@ message ConfigResponse {
 // Request to transfer client-local artifacts.
 message AddArtifactsRequest {
 
-  // The client_id is set by the client to be able to collate streaming responses from
-  // different queries.
-  string client_id = 1;
+  // (Required)
+  //
+  // The session_id specifies a spark session for a user id (which is specified
+  // by user_context.user_id). The session_id is set by the client to be able to
+  // collate streaming responses from different queries within the dedicated session.
+  string session_id = 1;
 
   // User context
   UserContext user_context = 2;
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 60fb94e8098..8ca004d520c 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -1459,7 +1459,7 @@ class SparkConnectPlanner(val session: SparkSession) {
 
   def process(
       command: proto.Command,
-      clientId: String,
+      sessionId: String,
       responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
     command.getCommandTypeCase match {
       case proto.Command.CommandTypeCase.REGISTER_FUNCTION =>
@@ -1473,14 +1473,14 @@ class SparkConnectPlanner(val session: SparkSession) {
       case proto.Command.CommandTypeCase.EXTENSION =>
         handleCommandPlugin(command.getExtension)
       case proto.Command.CommandTypeCase.SQL_COMMAND =>
-        handleSqlCommand(command.getSqlCommand, clientId, responseObserver)
+        handleSqlCommand(command.getSqlCommand, sessionId, responseObserver)
       case _ => throw new UnsupportedOperationException(s"$command not supported.")
     }
   }
 
   def handleSqlCommand(
       getSqlCommand: SqlCommand,
-      clientId: String,
+      sessionId: String,
       responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
     // Eagerly execute commands of the provided SQL string.
     val df = session.sql(getSqlCommand.getSql, getSqlCommand.getArgsMap)
@@ -1537,12 +1537,12 @@ class SparkConnectPlanner(val session: SparkSession) {
     responseObserver.onNext(
       ExecutePlanResponse
         .newBuilder()
-        .setClientId(clientId)
+        .setSessionId(sessionId)
         .setSqlCommandResult(result)
         .build())
 
     // Send Metrics
-    SparkConnectStreamHandler.sendMetricsToResponse(clientId, df)
+    SparkConnectStreamHandler.sendMetricsToResponse(sessionId, df)
   }
 
   private def handleRegisterUserDefinedFunction(
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
index e3d4da66a08..9520ec8015f 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
@@ -35,7 +35,7 @@ private[connect] class SparkConnectAnalyzeHandler(
   def handle(request: proto.AnalyzePlanRequest): Unit = {
     val session =
       SparkConnectService
-        .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getClientId)
+        .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getSessionId)
         .session
     session.withActive {
       val response = process(request, session)
@@ -155,7 +155,7 @@ private[connect] class SparkConnectAnalyzeHandler(
       case other => throw InvalidPlanInput(s"Unknown Analyze Method $other!")
     }
 
-    builder.setClientId(request.getClientId)
+    builder.setSessionId(request.getSessionId)
     builder.build()
   }
 }
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala
index 84f625222a8..38fd88297f3 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala
@@ -32,7 +32,7 @@ class SparkConnectConfigHandler(responseObserver: StreamObserver[proto.ConfigRes
   def handle(request: proto.ConfigRequest): Unit = {
     val session =
       SparkConnectService
-        .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getClientId)
+        .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getSessionId)
         .session
 
     val builder = request.getOperation.getOpTypeCase match {
@@ -53,7 +53,7 @@ class SparkConnectConfigHandler(responseObserver: StreamObserver[proto.ConfigRes
       case _ => throw new UnsupportedOperationException(s"${request.getOperation} not supported.")
     }
 
-    builder.setClientId(request.getClientId)
+    builder.setSessionId(request.getSessionId)
     responseObserver.onNext(builder.build())
     responseObserver.onCompleted()
   }
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index 15d5a981ae8..0dd1741f099 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -44,7 +44,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
   def handle(v: ExecutePlanRequest): Unit = {
     val session =
       SparkConnectService
-        .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getClientId)
+        .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
         .session
     session.withActive {
       v.getPlan.getOpTypeCase match {
@@ -60,12 +60,12 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
     // Extract the plan from the request and convert it to a logical plan
     val planner = new SparkConnectPlanner(session)
     val dataframe = Dataset.ofRows(session, planner.transformRelation(request.getPlan.getRoot))
-    processAsArrowBatches(request.getClientId, dataframe, responseObserver)
+    processAsArrowBatches(request.getSessionId, dataframe, responseObserver)
     responseObserver.onNext(
-      SparkConnectStreamHandler.sendMetricsToResponse(request.getClientId, dataframe))
+      SparkConnectStreamHandler.sendMetricsToResponse(request.getSessionId, dataframe))
     if (dataframe.queryExecution.observedMetrics.nonEmpty) {
       responseObserver.onNext(
-        SparkConnectStreamHandler.sendObservedMetricsToResponse(request.getClientId, dataframe))
+        SparkConnectStreamHandler.sendObservedMetricsToResponse(request.getSessionId, dataframe))
     }
     responseObserver.onCompleted()
   }
@@ -73,7 +73,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
   private def handleCommand(session: SparkSession, request: ExecutePlanRequest): Unit = {
     val command = request.getPlan.getCommand
     val planner = new SparkConnectPlanner(session)
-    planner.process(command, request.getClientId, responseObserver)
+    planner.process(command, request.getSessionId, responseObserver)
     responseObserver.onCompleted()
   }
 }
@@ -96,7 +96,7 @@ object SparkConnectStreamHandler {
   }
 
   def processAsArrowBatches(
-      clientId: String,
+      sessionId: String,
       dataframe: DataFrame,
       responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
     val spark = dataframe.sparkSession
@@ -173,7 +173,7 @@ object SparkConnectStreamHandler {
           }
 
           partition.foreach { case (bytes, count) =>
-            val response = proto.ExecutePlanResponse.newBuilder().setClientId(clientId)
+            val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId)
             val batch = proto.ExecutePlanResponse.ArrowBatch
               .newBuilder()
               .setRowCount(count)
@@ -191,7 +191,7 @@ object SparkConnectStreamHandler {
       // Make sure at least 1 batch will be sent.
       if (numSent == 0) {
         val bytes = ArrowConverters.createEmptyArrowBatch(schema, timeZoneId)
-        val response = proto.ExecutePlanResponse.newBuilder().setClientId(clientId)
+        val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId)
         val batch = proto.ExecutePlanResponse.ArrowBatch
           .newBuilder()
           .setRowCount(0L)
@@ -203,17 +203,17 @@ object SparkConnectStreamHandler {
     }
   }
 
-  def sendMetricsToResponse(clientId: String, rows: DataFrame): ExecutePlanResponse = {
+  def sendMetricsToResponse(sessionId: String, rows: DataFrame): ExecutePlanResponse = {
     // Send a last batch with the metrics
     ExecutePlanResponse
       .newBuilder()
-      .setClientId(clientId)
+      .setSessionId(sessionId)
       .setMetrics(MetricGenerator.buildMetrics(rows.queryExecution.executedPlan))
       .build()
   }
 
   def sendObservedMetricsToResponse(
-      clientId: String,
+      sessionId: String,
       dataframe: DataFrame): ExecutePlanResponse = {
     val observedMetrics = dataframe.queryExecution.observedMetrics.map { case (name, row) =>
       val cols = (0 until row.length).map(i => toConnectProtoValue(row(i)))
@@ -226,7 +226,7 @@ object SparkConnectStreamHandler {
     // Prepare a response with the observed metrics.
     ExecutePlanResponse
       .newBuilder()
-      .setClientId(clientId)
+      .setSessionId(sessionId)
       .addAllObservedMetrics(observedMetrics.asJava)
       .build()
   }
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
index 2885d0035bc..e2aecaaea86 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
@@ -221,7 +221,7 @@ class SparkConnectServiceSuite extends SharedSparkSession {
       .newBuilder()
       .setPlan(plan)
       .setUserContext(context)
-      .setClientId("session")
+      .setSessionId("session")
       .build()
 
     // The observer is executed inside this thread. So
diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py
index 2594640aa3e..8c85f17bb5f 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -712,7 +712,7 @@ class SparkConnectClient(object):
 
     def _execute_plan_request_with_metadata(self) -> pb2.ExecutePlanRequest:
         req = pb2.ExecutePlanRequest()
-        req.client_id = self._session_id
+        req.session_id = self._session_id
         req.client_type = self._builder.userAgent
         if self._user_id:
             req.user_context.user_id = self._user_id
@@ -720,7 +720,7 @@ class SparkConnectClient(object):
 
     def _analyze_plan_request_with_metadata(self) -> pb2.AnalyzePlanRequest:
         req = pb2.AnalyzePlanRequest()
-        req.client_id = self._session_id
+        req.session_id = self._session_id
         req.client_type = self._builder.userAgent
         if self._user_id:
             req.user_context.user_id = self._user_id
@@ -791,10 +791,10 @@ class SparkConnectClient(object):
             ):
                 with attempt:
                     resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
-                    if resp.client_id != self._session_id:
+                    if resp.session_id != self._session_id:
                         raise SparkConnectException(
                             "Received incorrect session identifier for request:"
-                            f"{resp.client_id} != {self._session_id}"
+                            f"{resp.session_id} != {self._session_id}"
                         )
                     return AnalyzeResult.fromProto(resp)
             raise SparkConnectException("Invalid state during retry exception handling.")
@@ -818,10 +818,10 @@ class SparkConnectClient(object):
             ):
                 with attempt:
                     for b in self._stub.ExecutePlan(req, metadata=self._builder.metadata()):
-                        if b.client_id != self._session_id:
+                        if b.session_id != self._session_id:
                             raise SparkConnectException(
                                 "Received incorrect session identifier for request: "
-                                f"{b.client_id} != {self._session_id}"
+                                f"{b.session_id} != {self._session_id}"
                             )
         except grpc.RpcError as rpc_error:
             self._handle_error(rpc_error)
@@ -842,10 +842,10 @@ class SparkConnectClient(object):
                 with attempt:
                     batches = []
                     for b in self._stub.ExecutePlan(req, metadata=self._builder.metadata()):
-                        if b.client_id != self._session_id:
+                        if b.session_id != self._session_id:
                             raise SparkConnectException(
                                 "Received incorrect session identifier for request: "
-                                f"{b.client_id} != {self._session_id}"
+                                f"{b.session_id} != {self._session_id}"
                             )
                         if b.metrics is not None:
                             logger.debug("Received metric batch.")
@@ -878,7 +878,7 @@ class SparkConnectClient(object):
 
     def _config_request_with_metadata(self) -> pb2.ConfigRequest:
         req = pb2.ConfigRequest()
-        req.client_id = self._session_id
+        req.session_id = self._session_id
         req.client_type = self._builder.userAgent
         if self._user_id:
             req.user_context.user_id = self._user_id
@@ -905,10 +905,10 @@ class SparkConnectClient(object):
             ):
                 with attempt:
                     resp = self._stub.Config(req, metadata=self._builder.metadata())
-                    if resp.client_id != self._session_id:
+                    if resp.session_id != self._session_id:
                         raise SparkConnectException(
                             "Received incorrect session identifier for request:"
-                            f"{resp.client_id} != {self._session_id}"
+                            f"{resp.session_id} != {self._session_id}"
                         )
                     return ConfigResult.fromProto(resp)
             raise SparkConnectException("Invalid state during retry exception handling.")
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py
index 6d41ce28c7c..c67e58b44cd 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...]
+    b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...]
 )
 
 
@@ -620,101 +620,101 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _USERCONTEXT._serialized_start = 309
     _USERCONTEXT._serialized_end = 431
     _ANALYZEPLANREQUEST._serialized_start = 434
-    _ANALYZEPLANREQUEST._serialized_end = 2089
-    _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1295
-    _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1344
-    _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1347
-    _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 1662
-    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1490
-    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 1662
-    _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 1664
-    _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 1717
-    _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 1719
-    _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 1769
-    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 1771
-    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 1825
-    _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 1827
-    _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 1880
-    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 1882
-    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 1896
-    _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 1898
-    _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 1939
-    _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 1941
-    _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2062
-    _ANALYZEPLANRESPONSE._serialized_start = 2092
-    _ANALYZEPLANRESPONSE._serialized_end = 3294
-    _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 2862
-    _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 2919
-    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 2921
-    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 2969
-    _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 2971
-    _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 3016
-    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 3018
-    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 3054
-    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 3056
-    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 3104
-    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 3106
-    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 3140
-    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 3142
-    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 3182
-    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 3184
-    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 3243
-    _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 3245
-    _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 3284
-    _EXECUTEPLANREQUEST._serialized_start = 3297
-    _EXECUTEPLANREQUEST._serialized_end = 3504
-    _EXECUTEPLANRESPONSE._serialized_start = 3507
-    _EXECUTEPLANRESPONSE._serialized_end = 4731
-    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 3962
-    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 4033
-    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 4035
-    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 4096
-    _EXECUTEPLANRESPONSE_METRICS._serialized_start = 4099
-    _EXECUTEPLANRESPONSE_METRICS._serialized_end = 4616
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 4194
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 4526
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 4403
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 4526
-    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 4528
-    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 4616
-    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 4618
-    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 4714
-    _KEYVALUE._serialized_start = 4733
-    _KEYVALUE._serialized_end = 4798
-    _CONFIGREQUEST._serialized_start = 4801
-    _CONFIGREQUEST._serialized_end = 5827
-    _CONFIGREQUEST_OPERATION._serialized_start = 5019
-    _CONFIGREQUEST_OPERATION._serialized_end = 5517
-    _CONFIGREQUEST_SET._serialized_start = 5519
-    _CONFIGREQUEST_SET._serialized_end = 5571
-    _CONFIGREQUEST_GET._serialized_start = 5573
-    _CONFIGREQUEST_GET._serialized_end = 5598
-    _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 5600
-    _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 5663
-    _CONFIGREQUEST_GETOPTION._serialized_start = 5665
-    _CONFIGREQUEST_GETOPTION._serialized_end = 5696
-    _CONFIGREQUEST_GETALL._serialized_start = 5698
-    _CONFIGREQUEST_GETALL._serialized_end = 5746
-    _CONFIGREQUEST_UNSET._serialized_start = 5748
-    _CONFIGREQUEST_UNSET._serialized_end = 5775
-    _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 5777
-    _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 5811
-    _CONFIGRESPONSE._serialized_start = 5829
-    _CONFIGRESPONSE._serialized_end = 5949
-    _ADDARTIFACTSREQUEST._serialized_start = 5952
-    _ADDARTIFACTSREQUEST._serialized_end = 6767
-    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 6299
-    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 6352
-    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 6354
-    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 6465
-    _ADDARTIFACTSREQUEST_BATCH._serialized_start = 6467
-    _ADDARTIFACTSREQUEST_BATCH._serialized_end = 6560
-    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 6563
-    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 6756
-    _ADDARTIFACTSRESPONSE._serialized_start = 6770
-    _ADDARTIFACTSRESPONSE._serialized_end = 6958
-    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 6877
-    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 6958
-    _SPARKCONNECTSERVICE._serialized_start = 6961
-    _SPARKCONNECTSERVICE._serialized_end = 7326
+    _ANALYZEPLANREQUEST._serialized_end = 2091
+    _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1297
+    _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1346
+    _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1349
+    _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 1664
+    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1492
+    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 1664
+    _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 1666
+    _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 1719
+    _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 1721
+    _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 1771
+    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 1773
+    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 1827
+    _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 1829
+    _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 1882
+    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 1884
+    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 1898
+    _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 1900
+    _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 1941
+    _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 1943
+    _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2064
+    _ANALYZEPLANRESPONSE._serialized_start = 2094
+    _ANALYZEPLANRESPONSE._serialized_end = 3298
+    _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 2866
+    _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 2923
+    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 2925
+    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 2973
+    _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 2975
+    _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 3020
+    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 3022
+    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 3058
+    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 3060
+    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 3108
+    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 3110
+    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 3144
+    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 3146
+    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 3186
+    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 3188
+    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 3247
+    _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 3249
+    _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 3288
+    _EXECUTEPLANREQUEST._serialized_start = 3301
+    _EXECUTEPLANREQUEST._serialized_end = 3510
+    _EXECUTEPLANRESPONSE._serialized_start = 3513
+    _EXECUTEPLANRESPONSE._serialized_end = 4739
+    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 3970
+    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 4041
+    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 4043
+    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 4104
+    _EXECUTEPLANRESPONSE_METRICS._serialized_start = 4107
+    _EXECUTEPLANRESPONSE_METRICS._serialized_end = 4624
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 4202
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 4534
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 4411
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 4534
+    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 4536
+    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 4624
+    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 4626
+    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 4722
+    _KEYVALUE._serialized_start = 4741
+    _KEYVALUE._serialized_end = 4806
+    _CONFIGREQUEST._serialized_start = 4809
+    _CONFIGREQUEST._serialized_end = 5837
+    _CONFIGREQUEST_OPERATION._serialized_start = 5029
+    _CONFIGREQUEST_OPERATION._serialized_end = 5527
+    _CONFIGREQUEST_SET._serialized_start = 5529
+    _CONFIGREQUEST_SET._serialized_end = 5581
+    _CONFIGREQUEST_GET._serialized_start = 5583
+    _CONFIGREQUEST_GET._serialized_end = 5608
+    _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 5610
+    _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 5673
+    _CONFIGREQUEST_GETOPTION._serialized_start = 5675
+    _CONFIGREQUEST_GETOPTION._serialized_end = 5706
+    _CONFIGREQUEST_GETALL._serialized_start = 5708
+    _CONFIGREQUEST_GETALL._serialized_end = 5756
+    _CONFIGREQUEST_UNSET._serialized_start = 5758
+    _CONFIGREQUEST_UNSET._serialized_end = 5785
+    _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 5787
+    _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 5821
+    _CONFIGRESPONSE._serialized_start = 5839
+    _CONFIGRESPONSE._serialized_end = 5961
+    _ADDARTIFACTSREQUEST._serialized_start = 5964
+    _ADDARTIFACTSREQUEST._serialized_end = 6781
+    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 6313
+    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 6366
+    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 6368
+    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 6479
+    _ADDARTIFACTSREQUEST_BATCH._serialized_start = 6481
+    _ADDARTIFACTSREQUEST_BATCH._serialized_end = 6574
+    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 6577
+    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 6770
+    _ADDARTIFACTSRESPONSE._serialized_start = 6784
+    _ADDARTIFACTSRESPONSE._serialized_end = 6972
+    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 6891
+    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 6972
+    _SPARKCONNECTSERVICE._serialized_start = 6975
+    _SPARKCONNECTSERVICE._serialized_end = 7340
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi
index 2e9a877b658..e87194f31aa 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -350,7 +350,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
             ],
         ) -> None: ...
 
-    CLIENT_ID_FIELD_NUMBER: builtins.int
+    SESSION_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
     CLIENT_TYPE_FIELD_NUMBER: builtins.int
     SCHEMA_FIELD_NUMBER: builtins.int
@@ -362,11 +362,12 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
     SPARK_VERSION_FIELD_NUMBER: builtins.int
     DDL_PARSE_FIELD_NUMBER: builtins.int
     SAME_SEMANTICS_FIELD_NUMBER: builtins.int
-    client_id: builtins.str
+    session_id: builtins.str
     """(Required)
 
-    The client_id is set by the client to be able to collate streaming responses from
-    different queries.
+    The session_id specifies a spark session for a user id (which is specified
+    by user_context.user_id). The session_id is set by the client to be able to
+    collate streaming responses from different queries within the dedicated session.
     """
     @property
     def user_context(self) -> global___UserContext:
@@ -397,7 +398,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
     def __init__(
         self,
         *,
-        client_id: builtins.str = ...,
+        session_id: builtins.str = ...,
         user_context: global___UserContext | None = ...,
         client_type: builtins.str | None = ...,
         schema: global___AnalyzePlanRequest.Schema | None = ...,
@@ -448,8 +449,6 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
             b"_client_type",
             "analyze",
             b"analyze",
-            "client_id",
-            b"client_id",
             "client_type",
             b"client_type",
             "ddl_parse",
@@ -466,6 +465,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
             b"same_semantics",
             "schema",
             b"schema",
+            "session_id",
+            b"session_id",
             "spark_version",
             b"spark_version",
             "tree_string",
@@ -638,7 +639,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
             self, field_name: typing_extensions.Literal["result", b"result"]
         ) -> None: ...
 
-    CLIENT_ID_FIELD_NUMBER: builtins.int
+    SESSION_ID_FIELD_NUMBER: builtins.int
     SCHEMA_FIELD_NUMBER: builtins.int
     EXPLAIN_FIELD_NUMBER: builtins.int
     TREE_STRING_FIELD_NUMBER: builtins.int
@@ -648,7 +649,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
     SPARK_VERSION_FIELD_NUMBER: builtins.int
     DDL_PARSE_FIELD_NUMBER: builtins.int
     SAME_SEMANTICS_FIELD_NUMBER: builtins.int
-    client_id: builtins.str
+    session_id: builtins.str
     @property
     def schema(self) -> global___AnalyzePlanResponse.Schema: ...
     @property
@@ -670,7 +671,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
     def __init__(
         self,
         *,
-        client_id: builtins.str = ...,
+        session_id: builtins.str = ...,
         schema: global___AnalyzePlanResponse.Schema | None = ...,
         explain: global___AnalyzePlanResponse.Explain | None = ...,
         tree_string: global___AnalyzePlanResponse.TreeString | None = ...,
@@ -709,8 +710,6 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
-            "client_id",
-            b"client_id",
             "ddl_parse",
             b"ddl_parse",
             "explain",
@@ -727,6 +726,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
             b"same_semantics",
             "schema",
             b"schema",
+            "session_id",
+            b"session_id",
             "spark_version",
             b"spark_version",
             "tree_string",
@@ -754,19 +755,24 @@ class ExecutePlanRequest(google.protobuf.message.Message):
 
     DESCRIPTOR: google.protobuf.descriptor.Descriptor
 
-    CLIENT_ID_FIELD_NUMBER: builtins.int
+    SESSION_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
     PLAN_FIELD_NUMBER: builtins.int
     CLIENT_TYPE_FIELD_NUMBER: builtins.int
-    client_id: builtins.str
+    session_id: builtins.str
     """(Required)
 
-    The client_id is set by the client to be able to collate streaming responses from
-    different queries.
+    The session_id specifies a spark session for a user id (which is specified
+    by user_context.user_id). The session_id is set by the client to be able to
+    collate streaming responses from different queries within the dedicated session.
     """
     @property
     def user_context(self) -> global___UserContext:
-        """(Required) User context"""
+        """(Required) User context
+
+        user_context.user_id and session+id both identify a unique remote spark session on the
+        server side.
+        """
     @property
     def plan(self) -> global___Plan:
         """(Required) The logical plan to be executed / analyzed."""
@@ -778,7 +784,7 @@ class ExecutePlanRequest(google.protobuf.message.Message):
     def __init__(
         self,
         *,
-        client_id: builtins.str = ...,
+        session_id: builtins.str = ...,
         user_context: global___UserContext | None = ...,
         plan: global___Plan | None = ...,
         client_type: builtins.str | None = ...,
@@ -801,12 +807,12 @@ class ExecutePlanRequest(google.protobuf.message.Message):
         field_name: typing_extensions.Literal[
             "_client_type",
             b"_client_type",
-            "client_id",
-            b"client_id",
             "client_type",
             b"client_type",
             "plan",
             b"plan",
+            "session_id",
+            b"session_id",
             "user_context",
             b"user_context",
         ],
@@ -819,7 +825,7 @@ global___ExecutePlanRequest = ExecutePlanRequest
 
 class ExecutePlanResponse(google.protobuf.message.Message):
     """The response of a query, can be one or more for each request. Responses belonging to the
-    same input query, carry the same `client_id`.
+    same input query, carry the same `session_id`.
     """
 
     DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -995,13 +1001,13 @@ class ExecutePlanResponse(google.protobuf.message.Message):
             self, field_name: typing_extensions.Literal["name", b"name", "values", b"values"]
         ) -> None: ...
 
-    CLIENT_ID_FIELD_NUMBER: builtins.int
+    SESSION_ID_FIELD_NUMBER: builtins.int
     ARROW_BATCH_FIELD_NUMBER: builtins.int
     SQL_COMMAND_RESULT_FIELD_NUMBER: builtins.int
     EXTENSION_FIELD_NUMBER: builtins.int
     METRICS_FIELD_NUMBER: builtins.int
     OBSERVED_METRICS_FIELD_NUMBER: builtins.int
-    client_id: builtins.str
+    session_id: builtins.str
     @property
     def arrow_batch(self) -> global___ExecutePlanResponse.ArrowBatch: ...
     @property
@@ -1025,7 +1031,7 @@ class ExecutePlanResponse(google.protobuf.message.Message):
     def __init__(
         self,
         *,
-        client_id: builtins.str = ...,
+        session_id: builtins.str = ...,
         arrow_batch: global___ExecutePlanResponse.ArrowBatch | None = ...,
         sql_command_result: global___ExecutePlanResponse.SqlCommandResult | None = ...,
         extension: google.protobuf.any_pb2.Any | None = ...,
@@ -1053,8 +1059,6 @@ class ExecutePlanResponse(google.protobuf.message.Message):
         field_name: typing_extensions.Literal[
             "arrow_batch",
             b"arrow_batch",
-            "client_id",
-            b"client_id",
             "extension",
             b"extension",
             "metrics",
@@ -1063,6 +1067,8 @@ class ExecutePlanResponse(google.protobuf.message.Message):
             b"observed_metrics",
             "response_type",
             b"response_type",
+            "session_id",
+            b"session_id",
             "sql_command_result",
             b"sql_command_result",
         ],
@@ -1310,15 +1316,16 @@ class ConfigRequest(google.protobuf.message.Message):
         ) -> None: ...
         def ClearField(self, field_name: typing_extensions.Literal["keys", b"keys"]) -> None: ...
 
-    CLIENT_ID_FIELD_NUMBER: builtins.int
+    SESSION_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
     OPERATION_FIELD_NUMBER: builtins.int
     CLIENT_TYPE_FIELD_NUMBER: builtins.int
-    client_id: builtins.str
+    session_id: builtins.str
     """(Required)
 
-    The client_id is set by the client to be able to collate streaming responses from
-    different queries.
+    The session_id specifies a spark session for a user id (which is specified
+    by user_context.user_id). The session_id is set by the client to be able to
+    collate streaming responses from different queries within the dedicated session.
     """
     @property
     def user_context(self) -> global___UserContext:
@@ -1334,7 +1341,7 @@ class ConfigRequest(google.protobuf.message.Message):
     def __init__(
         self,
         *,
-        client_id: builtins.str = ...,
+        session_id: builtins.str = ...,
         user_context: global___UserContext | None = ...,
         operation: global___ConfigRequest.Operation | None = ...,
         client_type: builtins.str | None = ...,
@@ -1357,12 +1364,12 @@ class ConfigRequest(google.protobuf.message.Message):
         field_name: typing_extensions.Literal[
             "_client_type",
             b"_client_type",
-            "client_id",
-            b"client_id",
             "client_type",
             b"client_type",
             "operation",
             b"operation",
+            "session_id",
+            b"session_id",
             "user_context",
             b"user_context",
         ],
@@ -1378,10 +1385,10 @@ class ConfigResponse(google.protobuf.message.Message):
 
     DESCRIPTOR: google.protobuf.descriptor.Descriptor
 
-    CLIENT_ID_FIELD_NUMBER: builtins.int
+    SESSION_ID_FIELD_NUMBER: builtins.int
     PAIRS_FIELD_NUMBER: builtins.int
     WARNINGS_FIELD_NUMBER: builtins.int
-    client_id: builtins.str
+    session_id: builtins.str
     @property
     def pairs(
         self,
@@ -1402,14 +1409,14 @@ class ConfigResponse(google.protobuf.message.Message):
     def __init__(
         self,
         *,
-        client_id: builtins.str = ...,
+        session_id: builtins.str = ...,
         pairs: collections.abc.Iterable[global___KeyValue] | None = ...,
         warnings: collections.abc.Iterable[builtins.str] | None = ...,
     ) -> None: ...
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
-            "client_id", b"client_id", "pairs", b"pairs", "warnings", b"warnings"
+            "pairs", b"pairs", "session_id", b"session_id", "warnings", b"warnings"
         ],
     ) -> None: ...
 
@@ -1546,14 +1553,17 @@ class AddArtifactsRequest(google.protobuf.message.Message):
             ],
         ) -> None: ...
 
-    CLIENT_ID_FIELD_NUMBER: builtins.int
+    SESSION_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
     BATCH_FIELD_NUMBER: builtins.int
     BEGIN_CHUNK_FIELD_NUMBER: builtins.int
     CHUNK_FIELD_NUMBER: builtins.int
-    client_id: builtins.str
-    """The client_id is set by the client to be able to collate streaming responses from
-    different queries.
+    session_id: builtins.str
+    """(Required)
+
+    The session_id specifies a spark session for a user id (which is specified
+    by user_context.user_id). The session_id is set by the client to be able to
+    collate streaming responses from different queries within the dedicated session.
     """
     @property
     def user_context(self) -> global___UserContext:
@@ -1574,7 +1584,7 @@ class AddArtifactsRequest(google.protobuf.message.Message):
     def __init__(
         self,
         *,
-        client_id: builtins.str = ...,
+        session_id: builtins.str = ...,
         user_context: global___UserContext | None = ...,
         batch: global___AddArtifactsRequest.Batch | None = ...,
         begin_chunk: global___AddArtifactsRequest.BeginChunkedArtifact | None = ...,
@@ -1604,10 +1614,10 @@ class AddArtifactsRequest(google.protobuf.message.Message):
             b"begin_chunk",
             "chunk",
             b"chunk",
-            "client_id",
-            b"client_id",
             "payload",
             b"payload",
+            "session_id",
+            b"session_id",
             "user_context",
             b"user_context",
         ],
diff --git a/python/pyspark/sql/tests/connect/test_client.py b/python/pyspark/sql/tests/connect/test_client.py
index 84281a6764f..6131e146363 100644
--- a/python/pyspark/sql/tests/connect/test_client.py
+++ b/python/pyspark/sql/tests/connect/test_client.py
@@ -64,7 +64,7 @@ class MockService:
     def ExecutePlan(self, req: proto.ExecutePlanRequest, metadata):
         self.req = req
         resp = proto.ExecutePlanResponse()
-        resp.client_id = self._session_id
+        resp.session_id = self._session_id
 
         pdf = pd.DataFrame(data={"col1": [1, 2]})
         schema = pa.Schema.from_pandas(pdf)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org