You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "grundprinzip (via GitHub)" <gi...@apache.org> on 2023/11/05 11:09:22 UTC

[PR] [SPARK-45798] Assert server-side session ID [spark]

grundprinzip opened a new pull request, #43664:
URL: https://github.com/apache/spark/pull/43664

   ### What changes were proposed in this pull request?
   Without this patch, when the server would restart because of an abnormal condition, the client would not realize that this be the case. For example, when a driver OOM occurs and the driver is restarted, the client would not realize that the server is restarted and a new session is assigned.
   
   This patch fixes this behavior and asserts that the server side session ID does not change during the connection. If it does change it throws an exception like this:
   
   ```
   >>> spark.range(10).collect()
   Traceback (most recent call last):
     File "<stdin>", line 1, in <module>
     File "/Users/martin.grund/Development/spark/python/pyspark/sql/connect/dataframe.py", line 1710, in collect
       table, schema = self._to_table()
     File "/Users/martin.grund/Development/spark/python/pyspark/sql/connect/dataframe.py", line 1722, in _to_table
       table, schema = self._session.client.to_table(query, self._plan.observations)
     File "/Users/martin.grund/Development/spark/python/pyspark/sql/connect/client/core.py", line 839, in to_table
       table, schema, _, _, _ = self._execute_and_fetch(req, observations)
     File "/Users/martin.grund/Development/spark/python/pyspark/sql/connect/client/core.py", line 1295, in _execute_and_fetch
       for response in self._execute_and_fetch_as_iterator(req, observations):
     File "/Users/martin.grund/Development/spark/python/pyspark/sql/connect/client/core.py", line 1273, in _execute_and_fetch_as_iterator
       self._handle_error(error)
     File "/Users/martin.grund/Development/spark/python/pyspark/sql/connect/client/core.py", line 1521, in _handle_error
       raise error
     File "/Users/martin.grund/Development/spark/python/pyspark/sql/connect/client/core.py", line 1266, in _execute_and_fetch_as_iterator
       yield from handle_response(b)
     File "/Users/martin.grund/Development/spark/python/pyspark/sql/connect/client/core.py", line 1193, in handle_response
       self._verify_response_integrity(b)
     File "/Users/martin.grund/Development/spark/python/pyspark/sql/connect/client/core.py", line 1622, in _verify_response_integrity
       raise SparkConnectException(
   pyspark.errors.exceptions.connect.SparkConnectException: Received incorrect server side session identifier for request. Please restart Spark Session. (9493c83d-cfa4-488f-9522-838ef3df90bf != c5302e8f-170d-477e-908d-299927b68fd8)
   ```
   
   ### Why are the changes needed?
   Stability
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   - Existing tests cover the basic invariant.
   - Added new tests.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1383030328


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -179,6 +184,9 @@ class ArtifactManager(
     val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
       private val summaries = mutable.Buffer.empty[ArtifactSummary]
       override def onNext(v: AddArtifactsResponse): Unit = {
+        if (v.getSessionId != sessionId) {
+          throw new IllegalStateException(s"Session ID mismatch: $sessionId != ${v.getSessionId}")
+        }

Review Comment:
   this would throw in the GRPC handler thread that calls this callback. I'm not sure what the effect of it would be...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #43664:
URL: https://github.com/apache/spark/pull/43664#issuecomment-1804936516

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1384956789


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectStubState.scala:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import com.google.protobuf.GeneratedMessageV3
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.internal.Logging
+
+// This is common state shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. The invariant is
+// that the same stub instance is used for all requests from the same client.
+class SparkConnectStubState extends Logging {
+  // Server side session ID, used to detect if the server side session changed. This is set upon
+  // receiving the first response from the server. This value is used only for executions that
+  // do not use server-side streaming.
+  private var serverSideSessionId: Option[String] = None
+
+  def verifyResponse[RespT <: GeneratedMessageV3](fn: => RespT): RespT = {
+    val response = fn
+    val field = response.getDescriptorForType.findFieldByName("server_side_session_id")
+    // If the field does not exist, we ignore it. New / Old message might not contain it and this
+    // behavior allows us to be compatible.
+    if (field != null) {
+      val value = response.getField(field)
+      serverSideSessionId match {
+        case Some(id) if value != id =>
+          throw new IllegalStateException(s"Server side session ID changed from $id to $value")

Review Comment:
   fixed



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectStubState.scala:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import com.google.protobuf.GeneratedMessageV3
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.internal.Logging
+
+// This is common state shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. The invariant is
+// that the same stub instance is used for all requests from the same client.
+class SparkConnectStubState extends Logging {
+  // Server side session ID, used to detect if the server side session changed. This is set upon
+  // receiving the first response from the server. This value is used only for executions that
+  // do not use server-side streaming.
+  private var serverSideSessionId: Option[String] = None
+
+  def verifyResponse[RespT <: GeneratedMessageV3](fn: => RespT): RespT = {
+    val response = fn
+    val field = response.getDescriptorForType.findFieldByName("server_side_session_id")
+    // If the field does not exist, we ignore it. New / Old message might not contain it and this
+    // behavior allows us to be compatible.
+    if (field != null) {
+      val value = response.getField(field)
+      serverSideSessionId match {
+        case Some(id) if value != id =>

Review Comment:
   addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1384277531


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -1620,6 +1593,42 @@ def cache_artifact(self, blob: bytes) -> str:
                 return self._artifact_manager.cache_artifact(blob)
         raise SparkConnectException("Invalid state during retry exception handling.")
 
+    def _verify_response_integrity(
+        self,
+        response: Union[
+            pb2.ConfigResponse,
+            pb2.ExecutePlanResponse,
+            pb2.InterruptResponse,
+            pb2.ReleaseExecuteResponse,
+            pb2.AddArtifactsResponse,
+            pb2.AnalyzePlanResponse,
+            pb2.FetchErrorDetailsResponse,
+            pb2.ReleaseSessionResponse,
+        ],
+    ) -> None:
+        """
+        Verifies the integrity of the response. This method checks if the session ID and the
+        server-side session ID match. If not, it throws an exception.
+        Parameters
+        ----------
+        response - One of the different response types handled by the Spark Connect service
+        """
+        if self._session_id != response.session_id:
+            raise SparkConnectException(
+                "Received incorrect session identifier for request:"
+                f"{response.session_id} != {self._session_id}"
+            )

Review Comment:
   Is this an invalid state that should not occur? If so, could we raise `PySparkAssertionError` instead? 



##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -1620,6 +1593,42 @@ def cache_artifact(self, blob: bytes) -> str:
                 return self._artifact_manager.cache_artifact(blob)
         raise SparkConnectException("Invalid state during retry exception handling.")
 
+    def _verify_response_integrity(
+        self,
+        response: Union[
+            pb2.ConfigResponse,
+            pb2.ExecutePlanResponse,
+            pb2.InterruptResponse,
+            pb2.ReleaseExecuteResponse,
+            pb2.AddArtifactsResponse,
+            pb2.AnalyzePlanResponse,
+            pb2.FetchErrorDetailsResponse,
+            pb2.ReleaseSessionResponse,
+        ],
+    ) -> None:
+        """
+        Verifies the integrity of the response. This method checks if the session ID and the
+        server-side session ID match. If not, it throws an exception.
+        Parameters
+        ----------
+        response - One of the different response types handled by the Spark Connect service
+        """
+        if self._session_id != response.session_id:
+            raise SparkConnectException(
+                "Received incorrect session identifier for request:"
+                f"{response.session_id} != {self._session_id}"
+            )
+        if self._server_session_id is not None:
+            if response.server_side_session_id != self._server_session_id:
+                raise SparkConnectException(
+                    "Received incorrect server side session identifier for request. "
+                    "Please restart Spark Session. ("

Review Comment:
   Can we be more explicit in this error message on how to restart a Spark session?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1383025193


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -803,9 +836,13 @@ message ReleaseSessionRequest {
   optional string client_type = 3;
 }
 
+// Next ID: 3
 message ReleaseSessionResponse {
   // Session id of the session on which the release executed.
   string session_id = 1;
+  // Server-side generated idempotency key that the client can use to assert that the server side
+  // session has not changed.
+  string server_side_session_id = 2;

Review Comment:
   hm, wouldn't it come as empty string from older servers then as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1385540515


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectStubState.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import com.google.protobuf.GeneratedMessageV3
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.internal.Logging
+
+// This is common state shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. The invariant is
+// that the same stub instance is used for all requests from the same client. In addition,
+// this class provides access to the commonly configured retry policy and exception conversion
+// logic.
+class SparkConnectStubState(
+    channel: ManagedChannel,
+    val retryPolicy: GrpcRetryHandler.RetryPolicy)
+    extends Logging {
+
+  // Responsible to convert the GRPC Status exceptions into Spark exceptions.
+  lazy val exceptionConverter: GrpcExceptionConverter = new GrpcExceptionConverter(channel)
+
+  // Manages the retry handler logic used by the stubs.
+  lazy val retryHandler = new GrpcRetryHandler(retryPolicy)
+
+  // Server side session ID, used to detect if the server side session changed. This is set upon
+  // receiving the first response from the server. This value is used only for executions that
+  // do not use server-side streaming.
+  private var serverSideSessionId: Option[String] = None
+
+  def verifyResponse[RespT <: GeneratedMessageV3](fn: => RespT): RespT = {

Review Comment:
   Done



##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala:
##########
@@ -232,58 +282,9 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
       assert(spark.streams.listListeners().length == 1) // only process termination listener
     } finally {
       SparkConnectService.stop()
+      Thread.sleep(4.seconds.toMillis)
       // remove process termination listener
       spark.streams.listListeners().foreach(spark.streams.removeListener)
     }
   }
-
-  test("python listener process: process terminates after listener is removed") {

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1383019163


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -920,6 +958,12 @@ message FetchErrorDetailsResponse {
 
   // A list of errors.
   repeated Error errors = 2;
+
+  // Server-side generated idempotency key that the client can use to assert that the server side
+  // session has not changed.
+  string server_side_session_id = 3;
+
+  string session_id = 4;

Review Comment:
   yeah, nested messages in the end...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1382963356


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -920,6 +958,12 @@ message FetchErrorDetailsResponse {
 
   // A list of errors.
   repeated Error errors = 2;
+
+  // Server-side generated idempotency key that the client can use to assert that the server side
+  // session has not changed.
+  string server_side_session_id = 3;
+
+  string session_id = 4;

Review Comment:
   nit: place it at the beginning of the response, like in others (I would put it even above the nested msgs)



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -803,9 +836,13 @@ message ReleaseSessionRequest {
   optional string client_type = 3;
 }
 
+// Next ID: 3
 message ReleaseSessionResponse {
   // Session id of the session on which the release executed.
   string session_id = 1;
+  // Server-side generated idempotency key that the client can use to assert that the server side
+  // session has not changed.
+  string server_side_session_id = 2;

Review Comment:
   in this special case, should it be optional, because it will not be set when the session doesn't exist?
   will your wrapper handle that?



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -18,13 +18,93 @@ package org.apache.spark.sql.connect.client
 
 import scala.jdk.CollectionConverters._
 
+import com.google.protobuf.GeneratedMessageV3
 import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
 
 import org.apache.spark.connect.proto._
 
+// This is common logic shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. The invariant is
+// that the same stub instance is used for all requests from the same client.
+private[client] class SparkConnectCommonStub {
+  // Server side session ID, used to detect if the server side session changed. This is set upon
+  // receiving the first response from the server. This value is used only for executions that
+  // do not use server-side streaming.
+  private var serverSideSessionId: Option[String] = None
+
+  protected def verifyResponse[RespT <: GeneratedMessageV3](fn: => RespT): RespT = {
+    val response = fn
+    val field = response.getDescriptorForType.findFieldByName("server_side_session_id")
+    // If the field does not exist, we ignore this for now.

Review Comment:
   Maybe log a log warning.
   
   Better don't put "for now" in comment unless there's an actionable followup (when do we plan to do something about it?).



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -97,6 +97,10 @@ class ExecutePlanResponseReattachableIterator(
   private[connect] var iter: Option[java.util.Iterator[proto.ExecutePlanResponse]] =
     Some(rawBlockingStub.executePlan(initialRequest))
 
+  // Server side session ID, used to detect if the server side session changed. This is set upon
+  // receiving the first response from the server.
+  private var serverSideSessionId: Option[String] = None

Review Comment:
   technically, a restarted server will have no information of the operation, so trying to reattach will have failed with INVALID_HANDLE.OPERATION_NOT_FOUND.
   But it doesn't hurt.



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -18,13 +18,93 @@ package org.apache.spark.sql.connect.client
 
 import scala.jdk.CollectionConverters._
 
+import com.google.protobuf.GeneratedMessageV3
 import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
 
 import org.apache.spark.connect.proto._
 
+// This is common logic shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. The invariant is
+// that the same stub instance is used for all requests from the same client.
+private[client] class SparkConnectCommonStub {

Review Comment:
   nit: move to it's own file
   
   Also, it's not very different in what it does from RetryHandler, and GrpcExceptionConverter. Why make this one a superclass, while others are members?
   Why not make it a member ResponseValidator object and have responseValidator.wrapIterator and responseValidator.validateResponse instead of superclass?



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -18,13 +18,93 @@ package org.apache.spark.sql.connect.client
 
 import scala.jdk.CollectionConverters._
 
+import com.google.protobuf.GeneratedMessageV3
 import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
 
 import org.apache.spark.connect.proto._
 
+// This is common logic shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. The invariant is
+// that the same stub instance is used for all requests from the same client.
+private[client] class SparkConnectCommonStub {
+  // Server side session ID, used to detect if the server side session changed. This is set upon
+  // receiving the first response from the server. This value is used only for executions that
+  // do not use server-side streaming.
+  private var serverSideSessionId: Option[String] = None

Review Comment:
   this will be different in blocking and non-blocking stub, technically possibly leading to the two operating with different ids. Should there be a common stub-state object for that?
   
   If you make it a ResponseValidator object, it can there share this state.
   
   Why not in general have the CustomSparkConnectCommonStub object be a container that holds instances of ResponseValidator, RetryHandler and GrpcExceptionConverter that can be shared between the blocking and non-blocking stub?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1383825497


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -18,13 +18,93 @@ package org.apache.spark.sql.connect.client
 
 import scala.jdk.CollectionConverters._
 
+import com.google.protobuf.GeneratedMessageV3
 import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
 
 import org.apache.spark.connect.proto._
 
+// This is common logic shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. The invariant is
+// that the same stub instance is used for all requests from the same client.
+private[client] class SparkConnectCommonStub {
+  // Server side session ID, used to detect if the server side session changed. This is set upon
+  // receiving the first response from the server. This value is used only for executions that
+  // do not use server-side streaming.
+  private var serverSideSessionId: Option[String] = None

Review Comment:
   I moved it to shared state between the stubs



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -18,13 +18,93 @@ package org.apache.spark.sql.connect.client
 
 import scala.jdk.CollectionConverters._
 
+import com.google.protobuf.GeneratedMessageV3
 import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
 
 import org.apache.spark.connect.proto._
 
+// This is common logic shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. The invariant is
+// that the same stub instance is used for all requests from the same client.
+private[client] class SparkConnectCommonStub {

Review Comment:
   Refactored



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1383861338


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -18,13 +18,93 @@ package org.apache.spark.sql.connect.client
 
 import scala.jdk.CollectionConverters._
 
+import com.google.protobuf.GeneratedMessageV3
 import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
 
 import org.apache.spark.connect.proto._
 
+// This is common logic shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. The invariant is
+// that the same stub instance is used for all requests from the same client.
+private[client] class SparkConnectCommonStub {
+  // Server side session ID, used to detect if the server side session changed. This is set upon
+  // receiving the first response from the server. This value is used only for executions that
+  // do not use server-side streaming.
+  private var serverSideSessionId: Option[String] = None
+
+  protected def verifyResponse[RespT <: GeneratedMessageV3](fn: => RespT): RespT = {
+    val response = fn
+    val field = response.getDescriptorForType.findFieldByName("server_side_session_id")
+    // If the field does not exist, we ignore this for now.

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1384511928


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -1620,6 +1593,42 @@ def cache_artifact(self, blob: bytes) -> str:
                 return self._artifact_manager.cache_artifact(blob)
         raise SparkConnectException("Invalid state during retry exception handling.")
 
+    def _verify_response_integrity(
+        self,
+        response: Union[
+            pb2.ConfigResponse,
+            pb2.ExecutePlanResponse,
+            pb2.InterruptResponse,
+            pb2.ReleaseExecuteResponse,
+            pb2.AddArtifactsResponse,
+            pb2.AnalyzePlanResponse,
+            pb2.FetchErrorDetailsResponse,
+            pb2.ReleaseSessionResponse,
+        ],
+    ) -> None:
+        """
+        Verifies the integrity of the response. This method checks if the session ID and the
+        server-side session ID match. If not, it throws an exception.
+        Parameters
+        ----------
+        response - One of the different response types handled by the Spark Connect service
+        """
+        if self._session_id != response.session_id:
+            raise SparkConnectException(
+                "Received incorrect session identifier for request:"
+                f"{response.session_id} != {self._session_id}"
+            )
+        if self._server_session_id is not None:
+            if response.server_side_session_id != self._server_session_id:
+                raise SparkConnectException(
+                    "Received incorrect server side session identifier for request. "
+                    "Please restart Spark Session. ("

Review Comment:
   I modified it to "Please create a new Spark Session to reconnect". But how exactly the new Spark Session is created really depends on the user.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1383024310


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -803,9 +836,13 @@ message ReleaseSessionRequest {
   optional string client_type = 3;
 }
 
+// Next ID: 3
 message ReleaseSessionResponse {
   // Session id of the session on which the release executed.
   string session_id = 1;
+  // Server-side generated idempotency key that the client can use to assert that the server side
+  // session has not changed.
+  string server_side_session_id = 2;

Review Comment:
   it will not be null but empty string I believe, as "" is the default value for primitive string type?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1384861491


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectStubState.scala:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import com.google.protobuf.GeneratedMessageV3
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.internal.Logging
+
+// This is common state shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. The invariant is
+// that the same stub instance is used for all requests from the same client.
+class SparkConnectStubState extends Logging {
+  // Server side session ID, used to detect if the server side session changed. This is set upon
+  // receiving the first response from the server. This value is used only for executions that
+  // do not use server-side streaming.
+  private var serverSideSessionId: Option[String] = None
+
+  def verifyResponse[RespT <: GeneratedMessageV3](fn: => RespT): RespT = {
+    val response = fn
+    val field = response.getDescriptorForType.findFieldByName("server_side_session_id")
+    // If the field does not exist, we ignore it. New / Old message might not contain it and this
+    // behavior allows us to be compatible.
+    if (field != null) {
+      val value = response.getField(field)
+      serverSideSessionId match {
+        case Some(id) if value != id =>

Review Comment:
   ```suggestion
           case Some(id) if value != id && value != "" =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on PR #43664:
URL: https://github.com/apache/spark/pull/43664#issuecomment-1802396221

   https://github.com/grundprinzip/spark/actions/runs/6796293583/job/18489636721 looks like it flaked out after already running all connect tests successfuly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1384956471


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectStubState.scala:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import com.google.protobuf.GeneratedMessageV3
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.internal.Logging
+
+// This is common state shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. The invariant is
+// that the same stub instance is used for all requests from the same client.
+class SparkConnectStubState extends Logging {

Review Comment:
   I've refactored this as you proposed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1384849886


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectStubState.scala:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import com.google.protobuf.GeneratedMessageV3
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.internal.Logging
+
+// This is common state shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. The invariant is
+// that the same stub instance is used for all requests from the same client.
+class SparkConnectStubState extends Logging {
+  // Server side session ID, used to detect if the server side session changed. This is set upon
+  // receiving the first response from the server. This value is used only for executions that
+  // do not use server-side streaming.
+  private var serverSideSessionId: Option[String] = None
+
+  def verifyResponse[RespT <: GeneratedMessageV3](fn: => RespT): RespT = {
+    val response = fn
+    val field = response.getDescriptorForType.findFieldByName("server_side_session_id")
+    // If the field does not exist, we ignore it. New / Old message might not contain it and this
+    // behavior allows us to be compatible.
+    if (field != null) {
+      val value = response.getField(field)
+      serverSideSessionId match {
+        case Some(id) if value != id =>
+          throw new IllegalStateException(s"Server side session ID changed from $id to $value")

Review Comment:
   If ReleaseSession returns empty string as server_side_session_id, will it not throw here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1383049383


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala:
##########
@@ -201,7 +201,9 @@ private[connect] class SparkConnectAnalyzeHandler(
       case other => throw InvalidPlanInput(s"Unknown Analyze Method $other!")
     }
 
-    builder.setSessionId(request.getSessionId)
+    builder
+      .setSessionId(request.getSessionId)
+      .setServerSideSessionId(session.sessionUUID)

Review Comment:
   Out of overabundance of caution, I would instead of doing sessionHolder.sessionUUID add a method to SessionHolder
   
   ```
   def serverSessionId = {
     // The server side session id must be different from the client generated one
     assert(session.sessionUUID) != sessionId
     session.sessionUUID
   }
   ```
   
   to prevent that someone in the future decides to sync these session.sessionUUID with the connect session_id, which would make this idempotency check not effective.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1383886945


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala:
##########
@@ -201,7 +201,9 @@ private[connect] class SparkConnectAnalyzeHandler(
       case other => throw InvalidPlanInput(s"Unknown Analyze Method $other!")
     }
 
-    builder.setSessionId(request.getSessionId)
+    builder
+      .setSessionId(request.getSessionId)
+      .setServerSideSessionId(session.sessionUUID)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1383865603


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -179,6 +184,9 @@ class ArtifactManager(
     val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
       private val summaries = mutable.Buffer.empty[ArtifactSummary]
       override def onNext(v: AddArtifactsResponse): Unit = {
+        if (v.getSessionId != sessionId) {
+          throw new IllegalStateException(s"Session ID mismatch: $sessionId != ${v.getSessionId}")
+        }

Review Comment:
   Fixed this to call onError()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1383018735


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -803,9 +836,13 @@ message ReleaseSessionRequest {
   optional string client_type = 3;
 }
 
+// Next ID: 3
 message ReleaseSessionResponse {
   // Session id of the session on which the release executed.
   string session_id = 1;
+  // Server-side generated idempotency key that the client can use to assert that the server side
+  // session has not changed.
+  string server_side_session_id = 2;

Review Comment:
   Yes, the wrapper will ignore the null server_side_session_id as it is identical to being backwards compatible to older servers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #43664: [SPARK-45798][CONNECT] Assert server-side session ID
URL: https://github.com/apache/spark/pull/43664


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1384806224


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -803,9 +836,13 @@ message ReleaseSessionRequest {
   optional string client_type = 3;
 }
 
+// Next ID: 3
 message ReleaseSessionResponse {
   // Session id of the session on which the release executed.
   string session_id = 1;
+  // Server-side generated idempotency key that the client can use to assert that the server side
+  // session has not changed.
+  string server_side_session_id = 2;

Review Comment:
   It is "" and I checked that the code properly deals with ""



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1384868257


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectStubState.scala:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import com.google.protobuf.GeneratedMessageV3
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.internal.Logging
+
+// This is common state shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. The invariant is
+// that the same stub instance is used for all requests from the same client.
+class SparkConnectStubState extends Logging {

Review Comment:
   I would call this `ResponseValidator`
   And have SparkConnectStubState be an object that contains all of ResponseValidator, GrpcRetryHandler etc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1385239481


##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala:
##########
@@ -232,58 +282,9 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
       assert(spark.streams.listListeners().length == 1) // only process termination listener
     } finally {
       SparkConnectService.stop()
+      Thread.sleep(4.seconds.toMillis)
       // remove process termination listener
       spark.streams.listListeners().foreach(spark.streams.removeListener)
     }
   }
-
-  test("python listener process: process terminates after listener is removed") {

Review Comment:
   is there a reason for moving this test above?
   Is this and the `Thread.sleep(4.seconds.toMillis)` some debug leftover?



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectStubState.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import com.google.protobuf.GeneratedMessageV3
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.internal.Logging
+
+// This is common state shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. The invariant is
+// that the same stub instance is used for all requests from the same client. In addition,
+// this class provides access to the commonly configured retry policy and exception conversion
+// logic.
+class SparkConnectStubState(
+    channel: ManagedChannel,
+    val retryPolicy: GrpcRetryHandler.RetryPolicy)
+    extends Logging {
+
+  // Responsible to convert the GRPC Status exceptions into Spark exceptions.
+  lazy val exceptionConverter: GrpcExceptionConverter = new GrpcExceptionConverter(channel)
+
+  // Manages the retry handler logic used by the stubs.
+  lazy val retryHandler = new GrpcRetryHandler(retryPolicy)
+
+  // Server side session ID, used to detect if the server side session changed. This is set upon
+  // receiving the first response from the server. This value is used only for executions that
+  // do not use server-side streaming.
+  private var serverSideSessionId: Option[String] = None
+
+  def verifyResponse[RespT <: GeneratedMessageV3](fn: => RespT): RespT = {

Review Comment:
   I would move  `verifyResponse` / `wrapIterator` / `wrapStreamObserver` into a separate object that this state holds a reference to (GrpcResponseValidator? ResponseIntegrityChecks?), but this could be a followup later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1383036026


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -179,6 +184,9 @@ class ArtifactManager(
     val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
       private val summaries = mutable.Buffer.empty[ArtifactSummary]
       override def onNext(v: AddArtifactsResponse): Unit = {
+        if (v.getSessionId != sessionId) {
+          throw new IllegalStateException(s"Session ID mismatch: $sessionId != ${v.getSessionId}")
+        }

Review Comment:
   SparkConnectAddArtifactsHandler doesn't seem to be setting sessionid



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org