You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "grundprinzip (via GitHub)" <gi...@apache.org> on 2023/07/31 18:00:13 UTC

[GitHub] [spark] grundprinzip commented on a diff in pull request #42228: [SPARK-44421][CONNECT] Reattachable execution in Spark Connect

grundprinzip commented on code in PR #42228:
URL: https://github.com/apache/spark/pull/42228#discussion_r1279664210


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -1447,14 +1475,29 @@
       "The handle <handle> is invalid."
     ],
     "subClass" : {
-      "ALREADY_EXISTS" : {

Review Comment:
   Was this a 3.5 only change? Just to make sure we're not changing something here. 



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.util.UUID
+
+import scala.util.control.NonFatal
+
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+
+class ExecutePlanResponseReattachableIterator(

Review Comment:
   Doc please



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -392,6 +400,12 @@ message ExecutePlanResponse {
     string name = 1;
     repeated Expression.Literal values = 2;
   }
+
+  message ResponseComplete {

Review Comment:
   Maybe ResultComplete to avoid confusion with the current response?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -62,6 +64,41 @@ object Connect {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefault(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE)
 
+  val CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION =
+    ConfigBuilder("spark.connect.execute.reattachable.senderMaxStreamDuration")
+      .internal()
+      .doc("For reattachable execution, after this amount of time the response stream will be " +
+        "automatically completed and client needs to send a new ReattachExecute RPC to continue. " +

Review Comment:
   Completed is a weird term here. But I don't have better suggestions immediately either. 



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -666,6 +680,108 @@ message InterruptResponse {
   repeated string interrupted_ids = 2;
 }
 
+message ReattachOptions {
+  // If true, the request can be reattached to using ReattachExecute.
+  // ReattachExecute can be used either if the stream broke with a GRPC network error,
+  // or if the server closed the stream without sending a response with StreamStatus.complete=true.
+  // The server will keep a buffer of responses in case a response is lost, and
+  // ReattachExecute needs to back-track.
+  //
+  // If false, the execution response stream will will not be reattachable, and all responses are
+  // immediately released by the server after being sent.
+  bool reattachable = 1;
+}
+
+message ReattachExecuteRequest {
+  // (Required)
+  //
+  // The session_id of the request to reattach to.
+  // This must be an id of existing session.
+  string session_id = 1;
+
+  // (Required) User context
+  //
+  // user_context.user_id and session+id both identify a unique remote spark session on the
+  // server side.
+  UserContext user_context = 2;
+
+  // (Required)
+  // Provide an id of the request to reattach to.
+  // This must be an id of existing operation.
+  string operation_id = 3;
+
+  // Provides optional information about the client sending the request. This field
+  // can be used for language or version specific information and is only intended for
+  // logging purposes and will not be interpreted by the server.
+  optional string client_type = 4;
+
+  // (Optional)
+  // Last already processed response id from the response stream.
+  // After reattach, server will resume the response stream after that response.
+  // If not specified, server will restart the stream from the start.
+  //
+  // Note: server controls the amount of responses that it buffers and it may drop responses,
+  // that are far behind the latest returned response, so this can't be used to arbitrarily
+  // scroll back the cursor. If the response is no longer available, this will result in an error.
+  optional string last_response_id = 5;
+}
+
+message ReleaseExecuteRequest {
+  // (Required)
+  //
+  // The session_id of the request to reattach to.
+  // This must be an id of existing session.
+  string session_id = 1;
+
+  // (Required) User context
+  //
+  // user_context.user_id and session+id both identify a unique remote spark session on the
+  // server side.
+  UserContext user_context = 2;
+
+  // (Required)
+  // Provide an id of the request to reattach to.
+  // This must be an id of existing operation.
+  string operation_id = 3;
+
+  // Provides optional information about the client sending the request. This field
+  // can be used for language or version specific information and is only intended for
+  // logging purposes and will not be interpreted by the server.
+  optional string client_type = 4;
+
+  ReleaseType release_type = 5;
+
+  enum ReleaseType {

Review Comment:
   If you have a parametrized enum that uses maybe message types why not model it directly as an enum? ReleaseAll ans ReleaseUntil



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala:
##########
@@ -154,11 +206,27 @@ private[connect] class ExecuteResponseObserver[T](val executeHolder: ExecuteHold
   /**
    * Remove cached responses after response with lastReturnedIndex is returned from getResponse.
    * Remove according to caching policy:
-   *   - if query is not reattachable, remove all responses up to and including
-   *     highestConsumedIndex.
+   *   - if retryBufferSize is 0 (or query is not reattachable), remove all responses up to and
+   *     including lastSentIndex.
+   *   - otherwise keep responses backwards from lastSentIndex until their total size exceeds
+   *     retryBufferSize
    */
-  private def removeCachedResponses() = {
-    var i = highestConsumedIndex
+  private def removeCachedResponses(lastSentIndex: Long) = {
+    var i = lastSentIndex
+    var totalResponsesSize = 0L
+    while (i >= 1 && responses.get(i).isDefined && totalResponsesSize < retryBufferSize) {
+      totalResponsesSize += responses.get(i).get.serializedByteSize

Review Comment:
   Is the total response size only update when we remove the items?



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -666,6 +680,108 @@ message InterruptResponse {
   repeated string interrupted_ids = 2;
 }
 
+message ReattachOptions {
+  // If true, the request can be reattached to using ReattachExecute.
+  // ReattachExecute can be used either if the stream broke with a GRPC network error,
+  // or if the server closed the stream without sending a response with StreamStatus.complete=true.
+  // The server will keep a buffer of responses in case a response is lost, and
+  // ReattachExecute needs to back-track.
+  //
+  // If false, the execution response stream will will not be reattachable, and all responses are
+  // immediately released by the server after being sent.
+  bool reattachable = 1;
+}
+
+message ReattachExecuteRequest {
+  // (Required)
+  //
+  // The session_id of the request to reattach to.
+  // This must be an id of existing session.
+  string session_id = 1;
+
+  // (Required) User context
+  //
+  // user_context.user_id and session+id both identify a unique remote spark session on the
+  // server side.
+  UserContext user_context = 2;
+
+  // (Required)
+  // Provide an id of the request to reattach to.
+  // This must be an id of existing operation.
+  string operation_id = 3;
+
+  // Provides optional information about the client sending the request. This field
+  // can be used for language or version specific information and is only intended for
+  // logging purposes and will not be interpreted by the server.
+  optional string client_type = 4;
+
+  // (Optional)
+  // Last already processed response id from the response stream.
+  // After reattach, server will resume the response stream after that response.
+  // If not specified, server will restart the stream from the start.
+  //
+  // Note: server controls the amount of responses that it buffers and it may drop responses,
+  // that are far behind the latest returned response, so this can't be used to arbitrarily
+  // scroll back the cursor. If the response is no longer available, this will result in an error.
+  optional string last_response_id = 5;
+}
+
+message ReleaseExecuteRequest {
+  // (Required)
+  //
+  // The session_id of the request to reattach to.
+  // This must be an id of existing session.
+  string session_id = 1;
+
+  // (Required) User context
+  //
+  // user_context.user_id and session+id both identify a unique remote spark session on the
+  // server side.
+  UserContext user_context = 2;
+
+  // (Required)
+  // Provide an id of the request to reattach to.
+  // This must be an id of existing operation.
+  string operation_id = 3;
+
+  // Provides optional information about the client sending the request. This field
+  // can be used for language or version specific information and is only intended for
+  // logging purposes and will not be interpreted by the server.
+  optional string client_type = 4;
+
+  ReleaseType release_type = 5;
+
+  enum ReleaseType {

Review Comment:
   This gets you around the issue it unspecified as well



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala:
##########
@@ -46,72 +57,118 @@ private[connect] class ExecuteGrpcResponseSender[T](grpcObserver: StreamObserver
 
   /**
    * Attach to the executionObserver, consume responses from it, and send them to grpcObserver.
+   *
+   * In non reattachable execution, it will keep sending responses until the query finishes. In
+   * reattachable execution, it can finish earlier after reaching a time deadline or size limit.
+   *
+   * After this function finishes, the grpcObserver is closed with either onCompleted or onError.
+   *
    * @param lastConsumedStreamIndex
    *   the last index that was already consumed and sent. This sender will start from index after
    *   that. 0 means start from beginning (since first response has index 1)
-   *
-   * @return
-   *   true if the execution was detached before stream completed. The caller needs to finish the
-   *   grpcObserver stream false if stream was finished. In this case, grpcObserver stream is
-   *   already completed.
    */
-  def run(
-      executionObserver: ExecuteResponseObserver[T],
-      lastConsumedStreamIndex: Long): Boolean = {
+  def run(lastConsumedStreamIndex: Long): Unit = {
+    logDebug(
+      s"GrpcResponseSender run for $executeHolder, " +
+        s"reattachable=${executeHolder.reattachable}, " +
+        s"lastConsumedStreamIndex=$lastConsumedStreamIndex")
+
     // register to be notified about available responses.
     executionObserver.attachConsumer(this)
 
     var nextIndex = lastConsumedStreamIndex + 1
     var finished = false
 
+    // Time at which this sender should finish if the response stream is not finished by then.
+    val deadlineTimeMillis = if (!executeHolder.reattachable) {
+      Long.MaxValue
+    } else {
+      val confSize =
+        SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION).toLong
+      if (confSize > 0) System.currentTimeMillis() + 1000 * confSize else Long.MaxValue

Review Comment:
   Does this not return a duration or something similar to avoid the manual conversion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org