You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "juliuszsompolski (via GitHub)" <gi...@apache.org> on 2023/07/30 18:25:08 UTC

[GitHub] [spark] juliuszsompolski opened a new pull request, #42228: [SPARK-44421][CONNECT] Reattachable execution in Spark Connect

juliuszsompolski opened a new pull request, #42228:
URL: https://github.com/apache/spark/pull/42228

   ### What changes were proposed in this pull request?
   
   (Draft)
   Done: server happy path
   Todo: server unhappy cleanup, client.
   
   
   ### Why are the changes needed?
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   ### How was this patch tested?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #42228: [SPARK-44421][SPARK-44423][CONNECT] Reattachable execution in Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #42228: [SPARK-44421][SPARK-44423][CONNECT] Reattachable execution in Spark Connect
URL: https://github.com/apache/spark/pull/42228


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42228: [SPARK-44421][CONNECT] Reattachable execution in Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42228:
URL: https://github.com/apache/spark/pull/42228#discussion_r1278756050


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.util.UUID
+
+import scala.util.control.NonFatal
+
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+
+class ExecutePlanResponseReattachableIterator(
+    request: proto.ExecutePlanRequest,
+    channel: ManagedChannel,
+    retryPolicy: GrpcRetryHandler.RetryPolicy)
+    extends java.util.Iterator[proto.ExecutePlanResponse]

Review Comment:
   Should it be `CloseableIterator` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42228: [SPARK-44421][SPARK-44423][CONNECT] Reattachable execution in Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42228:
URL: https://github.com/apache/spark/pull/42228#issuecomment-1659561556

   Sure, made a followup: https://github.com/apache/spark/pull/42254


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42228: [SPARK-44421][SPARK-44423][CONNECT] Reattachable execution in Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42228:
URL: https://github.com/apache/spark/pull/42228#discussion_r1280222878


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.util.UUID
+
+import scala.util.control.NonFatal
+
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+
+/**
+ * Retryable iterator of ExecutePlanResponses to an ExecutePlan call.
+ *
+ * It can handle situations when:
+ *   - the ExecutePlanResponse stream was broken by retryable network error (governed by
+ *     retryPolicy)
+ *   - the ExecutePlanResponse was gracefully ended by the server without a ResultComplete
+ *     message; this tells the client that there is more, and it should reattach to continue.
+ *
+ * Initial iterator is the result of an ExecutePlan on the request, but it can be reattached with
+ * ReattachExecute request. ReattachExecute request is provided the responseId of last returned
+ * ExecutePlanResponse on the iterator to return a new iterator from server that continues after
+ * that.
+ *
+ * Since in reattachable execute the server does buffer some responses in case the client needs to
+ * backtrack
+ */
+class ExecutePlanResponseReattachableIterator(
+    request: proto.ExecutePlanRequest,
+    channel: ManagedChannel,
+    retryPolicy: GrpcRetryHandler.RetryPolicy)
+    extends java.util.Iterator[proto.ExecutePlanResponse]
+    with Logging {
+
+  val operationId = if (request.hasOperationId) {
+    request.getOperationId
+  } else {
+    // Add operation id, if not present.
+    // with operationId set by the client, the client can use it to try to reattach on error
+    // even before getting the first response. If the operation in fact didn't even reach the
+    // server, that will end with INVALID_HANDLE.OPERATION_NOT_FOUND error.
+    UUID.randomUUID.toString
+  }
+
+  // Need raw stubs, don't want retry handling or error conversion done by the custom stubs.
+  // - this does it's own custom retry handling
+  // - error conversion is wrapped around this in CustomSparkConnectBlockingStub,
+  //   this needs raw GRPC errors for retries.
+  private val rawBlockingStub = proto.SparkConnectServiceGrpc.newBlockingStub(channel)
+  private val rawAsyncStub = proto.SparkConnectServiceGrpc.newStub(channel)
+
+  private val initialRequest: proto.ExecutePlanRequest = request
+    .toBuilder()
+    .addRequestOptions(
+      proto.ExecutePlanRequest.RequestOption
+        .newBuilder()
+        .setReattachOptions(proto.ReattachOptions.newBuilder().setReattachable(true).build())
+        .build())
+    .setOperationId(operationId)
+    .build()
+
+  // ResponseId of the last response returned by next()
+  private var lastReturnedResponseId: Option[String] = None
+
+  // True after ResponseComplete message was seen in the stream.
+  // Server will always send this message at the end of the stream, if the underlying iterator
+  // finishes without producing one, another iterator needs to be reattached.
+  private var responseComplete: Boolean = false
+
+  // Initial iterator comes from ExecutePlan request.
+  private var iterator: java.util.Iterator[proto.ExecutePlanResponse] =
+    rawBlockingStub.executePlan(initialRequest)
+
+  override def next(): proto.ExecutePlanResponse = synchronized {
+    // hasNext will trigger reattach in case the stream completed without responseComplete
+    if (!hasNext()) {
+      throw new java.util.NoSuchElementException()
+    }
+
+    // Get next response, possibly triggering reattach in case of stream error.
+    var firstTry = true
+    val ret = retry {
+      if (firstTry) {
+        // on first try, we use the existing iterator.
+        firstTry = false
+      } else {
+        // on retry, the iterator is borked, so we need a new one
+        iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+      }
+      iterator.next()

Review Comment:
   qq: can this iterator empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42228: [SPARK-44421][SPARK-44423][CONNECT] Reattachable execution in Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42228:
URL: https://github.com/apache/spark/pull/42228#discussion_r1280078095


##########
core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala:
##########
@@ -149,7 +149,7 @@ class SparkThrowableSuite extends SparkFunSuite {
     checkIfUnique(messageFormats)
   }
 
-  test("Error classes match with document") {
+  ignore("Error classes match with document") {

Review Comment:
   https://github.com/juliuszsompolski/apache-spark/actions/runs/5720841151/job/15504640463
   
   This fails. Since I am also actively working on this item together, I will just merge with ignoring this for now. Should be easy to fix. I will leave this to you @juliuszsompolski 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on pull request #42228: [SPARK-44421][SPARK-44423][CONNECT] Reattachable execution in Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on PR #42228:
URL: https://github.com/apache/spark/pull/42228#issuecomment-1659769032

   We call hasNext above to assure that it's not empty at this point.
   
   On Tue, Aug 1, 2023, 09:36 Hyukjin Kwon ***@***.***> wrote:
   
   > ***@***.**** commented on this pull request.
   > ------------------------------
   >
   > In
   > connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
   > <https://github.com/apache/spark/pull/42228#discussion_r1280222878>:
   >
   > > +    // hasNext will trigger reattach in case the stream completed without responseComplete
   > +    if (!hasNext()) {
   > +      throw new java.util.NoSuchElementException()
   > +    }
   > +
   > +    // Get next response, possibly triggering reattach in case of stream error.
   > +    var firstTry = true
   > +    val ret = retry {
   > +      if (firstTry) {
   > +        // on first try, we use the existing iterator.
   > +        firstTry = false
   > +      } else {
   > +        // on retry, the iterator is borked, so we need a new one
   > +        iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest())
   > +      }
   > +      iterator.next()
   >
   > qq: can this iterator empty?
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/spark/pull/42228#pullrequestreview-1556345712>,
   > or unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AF64GG7M7ZLJRJY53M6GDOLXTCWZPANCNFSM6AAAAAA25LFB6E>
   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42228: [SPARK-44421][CONNECT] Reattachable execution in Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42228:
URL: https://github.com/apache/spark/pull/42228#discussion_r1279762467


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala:
##########
@@ -154,11 +206,27 @@ private[connect] class ExecuteResponseObserver[T](val executeHolder: ExecuteHold
   /**
    * Remove cached responses after response with lastReturnedIndex is returned from getResponse.
    * Remove according to caching policy:
-   *   - if query is not reattachable, remove all responses up to and including
-   *     highestConsumedIndex.
+   *   - if retryBufferSize is 0 (or query is not reattachable), remove all responses up to and
+   *     including lastSentIndex.
+   *   - otherwise keep responses backwards from lastSentIndex until their total size exceeds
+   *     retryBufferSize
    */
-  private def removeCachedResponses() = {
-    var i = highestConsumedIndex
+  private def removeCachedResponses(lastSentIndex: Long) = {
+    var i = lastSentIndex
+    var totalResponsesSize = 0L
+    while (i >= 1 && responses.get(i).isDefined && totalResponsesSize < retryBufferSize) {
+      totalResponsesSize += responses.get(i).get.serializedByteSize

Review Comment:
   Given that it's a total of 1 MB of responses, even if they were tiny (100 bytes?), it's an iteration over at most thousands of elements, and usually a few elements, so I thought it's not worth the complication for the optimization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42228: [SPARK-44421][CONNECT] Reattachable execution in Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42228:
URL: https://github.com/apache/spark/pull/42228#discussion_r1279760761


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala:
##########
@@ -154,11 +206,27 @@ private[connect] class ExecuteResponseObserver[T](val executeHolder: ExecuteHold
   /**
    * Remove cached responses after response with lastReturnedIndex is returned from getResponse.
    * Remove according to caching policy:
-   *   - if query is not reattachable, remove all responses up to and including
-   *     highestConsumedIndex.
+   *   - if retryBufferSize is 0 (or query is not reattachable), remove all responses up to and
+   *     including lastSentIndex.
+   *   - otherwise keep responses backwards from lastSentIndex until their total size exceeds
+   *     retryBufferSize
    */
-  private def removeCachedResponses() = {
-    var i = highestConsumedIndex
+  private def removeCachedResponses(lastSentIndex: Long) = {
+    var i = lastSentIndex
+    var totalResponsesSize = 0L
+    while (i >= 1 && responses.get(i).isDefined && totalResponsesSize < retryBufferSize) {
+      totalResponsesSize += responses.get(i).get.serializedByteSize

Review Comment:
   This is indeed calculated on the fly when removed, and iterates back from the returned response until retryBufferSize is kept. This could be smarter, we could keep some "second finger" on the oldest buffered, and then cache the total between that and the highest consumed (our "first finger"), but it does actually get a bit more complicated when after a reattach the client backtracks (so our "first finger" moves back)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42228: [SPARK-44421][SPARK-44423][CONNECT] Reattachable execution in Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42228:
URL: https://github.com/apache/spark/pull/42228#issuecomment-1659517553

   Merged to master and branch-3.5.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42228: [SPARK-44421][CONNECT] Reattachable execution in Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42228:
URL: https://github.com/apache/spark/pull/42228#discussion_r1279684108


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.util.UUID
+
+import scala.util.control.NonFatal
+
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+
+class ExecutePlanResponseReattachableIterator(
+    request: proto.ExecutePlanRequest,
+    channel: ManagedChannel,
+    retryPolicy: GrpcRetryHandler.RetryPolicy)
+    extends java.util.Iterator[proto.ExecutePlanResponse]
+    with Logging {
+
+  val operationId = UUID.randomUUID.toString
+
+  // We don't want retry handling or error conversion done by the custom stubs.
+  private val rawBlockingStub = proto.SparkConnectServiceGrpc.newBlockingStub(channel)
+  private val rawAsyncStub = proto.SparkConnectServiceGrpc.newStub(channel)
+
+  private val initialRequest: proto.ExecutePlanRequest = request
+    .toBuilder()
+    .addRequestOptions(
+      proto.ExecutePlanRequest.RequestOption.newBuilder()
+        .setReattachOptions(proto.ReattachOptions.newBuilder().setReattachable(true).build())
+        .build())
+    .setOperationId(operationId)
+    .build()
+
+  private var lastResponseId: Option[String] = None
+  private var responseComplete: Boolean = false
+  private var iterator: java.util.Iterator[proto.ExecutePlanResponse] =
+    rawBlockingStub.executePlan(initialRequest)
+
+  override def next(): proto.ExecutePlanResponse = synchronized {
+    hasNext() // will trigger reattach in case the stream completed without responseComplete
+
+    // Get next response, possibly triggering reattach in case of stream error.
+    var firstTry = true
+    val ret = retry {
+      if (firstTry) {
+        // on first try, we use the initial iterator.
+        firstTry = false
+      } else {
+        // Error retry reattach: After an error, attempt to
+        iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+      }
+      iterator.next()
+    }
+
+    // Record last returned response, to know where to restart in case of reattach.
+    lastResponseId = Some(ret.getResponseId)
+    if (ret.hasResponseComplete) {
+      responseComplete = true
+      releaseExecute(None) // release all
+    } else {
+      releaseExecute(lastResponseId) // release until this response
+    }
+    ret
+  }
+
+  override def hasNext(): Boolean = synchronized {
+    if (responseComplete) {
+      // After response complete response
+      return false
+    }
+    retry {
+      var hasNext = iterator.hasNext()
+      // Graceful reattach:
+      // If iterator ended, but there was no ResponseComplete, it means that there is more,
+      // and we need to reattach. While ResponseComplete didn't arrive, we keep reattaching.
+      if (!hasNext && !responseComplete) {
+        while (!hasNext) {
+          iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+          assert(!responseComplete) // shouldn't change...
+          hasNext = iterator.hasNext()
+        }
+      }
+      hasNext
+    }
+  }
+
+  private def releaseExecute(untilResponseId: Option[String]) = {
+    val request = createReleaseExecuteRequest(untilResponseId)
+    rawAsyncStub.releaseExecute(
+      request,
+      createRetryingReleaseExecuteResponseObserer(request)
+    )
+  }
+
+  private def createReattachExecuteRequest() = {
+    val reattach = proto.ReattachExecuteRequest.newBuilder()
+      .setSessionId(initialRequest.getSessionId)
+      .setUserContext(initialRequest.getUserContext)
+      .setOperationId(initialRequest.getOperationId)
+
+    if (initialRequest.hasClientType) {
+      reattach.setClientType(initialRequest.getClientType)
+    }
+
+    if (lastResponseId.isDefined) {
+      reattach.setLastResponseId(lastResponseId.get)
+    }
+    reattach.build()
+  }
+
+  private def createReleaseExecuteRequest(untilResponseId: Option[String]) = {
+    val release = proto.ReleaseExecuteRequest.newBuilder()
+      .setSessionId(initialRequest.getSessionId)
+      .setUserContext(initialRequest.getUserContext)
+      .setOperationId(initialRequest.getOperationId)
+
+    if (initialRequest.hasClientType) {
+      release.setClientType(initialRequest.getClientType)
+    }
+
+    untilResponseId match {
+      case None =>
+        release.setReleaseType(proto.ReleaseExecuteRequest.ReleaseType.RELEASE_ALL)
+      case Some(responseId) =>
+        release.setReleaseType(proto.ReleaseExecuteRequest.ReleaseType.RELEASE_UNTIL_RESPONSE)
+        release.setUntilResponseId(responseId)
+    }
+
+    release.build()
+  }
+
+  private def createRetryingReleaseExecuteResponseObserer(
+    requestForRetry: proto.ReleaseExecuteRequest, currentRetryNum: Int = 0)
+    : StreamObserver[proto.ReleaseExecuteResponse] = {
+    new StreamObserver[proto.ReleaseExecuteResponse] {
+      override def onNext(v: proto.ReleaseExecuteResponse): Unit = {}
+      override def onCompleted(): Unit = {}
+      override def onError(t: Throwable): Unit = t match {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          Thread.sleep(
+            (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+              .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+          rawAsyncStub.releaseExecute(requestForRetry,
+            createRetryingReleaseExecuteResponseObserer(requestForRetry, currentRetryNum + 1))

Review Comment:
   We want the client to call the releaseExecute async and don't block from continuing with the results on getting an ack that it executed. retry would be a blocking iteration... unless I mixed something up and I could use it for async as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42228: [SPARK-44421][CONNECT] Reattachable execution in Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42228:
URL: https://github.com/apache/spark/pull/42228#discussion_r1279857208


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.util.UUID
+
+import scala.util.control.NonFatal
+
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+
+class ExecutePlanResponseReattachableIterator(
+    request: proto.ExecutePlanRequest,
+    channel: ManagedChannel,
+    retryPolicy: GrpcRetryHandler.RetryPolicy)
+    extends java.util.Iterator[proto.ExecutePlanResponse]

Review Comment:
   The regular iterator returned from regular executePlan is not a closeable iterator, and grpc-java doesn't seem to play nicely with closeable iterators in general: https://github.com/grpc/grpc-java/issues/2409
   I think the best I can do in Java is make sure that internal users in the client consume it... and over places that give control over it to user (like Dataset.toLocalIterator) there's no good control.
   If an iterator is left open and idle:
   * the server will close the RPC stream after 5 minutes (STREAM_MAX_TIME)
   * then the server will tear down executions without attached RPC stream after another 5 minutes (TODO)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #42228: [SPARK-44421][CONNECT] Reattachable execution in Spark Connect

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #42228:
URL: https://github.com/apache/spark/pull/42228#discussion_r1279664210


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -1447,14 +1475,29 @@
       "The handle <handle> is invalid."
     ],
     "subClass" : {
-      "ALREADY_EXISTS" : {

Review Comment:
   Was this a 3.5 only change? Just to make sure we're not changing something here. 



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.util.UUID
+
+import scala.util.control.NonFatal
+
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+
+class ExecutePlanResponseReattachableIterator(

Review Comment:
   Doc please



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -392,6 +400,12 @@ message ExecutePlanResponse {
     string name = 1;
     repeated Expression.Literal values = 2;
   }
+
+  message ResponseComplete {

Review Comment:
   Maybe ResultComplete to avoid confusion with the current response?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -62,6 +64,41 @@ object Connect {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefault(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE)
 
+  val CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION =
+    ConfigBuilder("spark.connect.execute.reattachable.senderMaxStreamDuration")
+      .internal()
+      .doc("For reattachable execution, after this amount of time the response stream will be " +
+        "automatically completed and client needs to send a new ReattachExecute RPC to continue. " +

Review Comment:
   Completed is a weird term here. But I don't have better suggestions immediately either. 



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -666,6 +680,108 @@ message InterruptResponse {
   repeated string interrupted_ids = 2;
 }
 
+message ReattachOptions {
+  // If true, the request can be reattached to using ReattachExecute.
+  // ReattachExecute can be used either if the stream broke with a GRPC network error,
+  // or if the server closed the stream without sending a response with StreamStatus.complete=true.
+  // The server will keep a buffer of responses in case a response is lost, and
+  // ReattachExecute needs to back-track.
+  //
+  // If false, the execution response stream will will not be reattachable, and all responses are
+  // immediately released by the server after being sent.
+  bool reattachable = 1;
+}
+
+message ReattachExecuteRequest {
+  // (Required)
+  //
+  // The session_id of the request to reattach to.
+  // This must be an id of existing session.
+  string session_id = 1;
+
+  // (Required) User context
+  //
+  // user_context.user_id and session+id both identify a unique remote spark session on the
+  // server side.
+  UserContext user_context = 2;
+
+  // (Required)
+  // Provide an id of the request to reattach to.
+  // This must be an id of existing operation.
+  string operation_id = 3;
+
+  // Provides optional information about the client sending the request. This field
+  // can be used for language or version specific information and is only intended for
+  // logging purposes and will not be interpreted by the server.
+  optional string client_type = 4;
+
+  // (Optional)
+  // Last already processed response id from the response stream.
+  // After reattach, server will resume the response stream after that response.
+  // If not specified, server will restart the stream from the start.
+  //
+  // Note: server controls the amount of responses that it buffers and it may drop responses,
+  // that are far behind the latest returned response, so this can't be used to arbitrarily
+  // scroll back the cursor. If the response is no longer available, this will result in an error.
+  optional string last_response_id = 5;
+}
+
+message ReleaseExecuteRequest {
+  // (Required)
+  //
+  // The session_id of the request to reattach to.
+  // This must be an id of existing session.
+  string session_id = 1;
+
+  // (Required) User context
+  //
+  // user_context.user_id and session+id both identify a unique remote spark session on the
+  // server side.
+  UserContext user_context = 2;
+
+  // (Required)
+  // Provide an id of the request to reattach to.
+  // This must be an id of existing operation.
+  string operation_id = 3;
+
+  // Provides optional information about the client sending the request. This field
+  // can be used for language or version specific information and is only intended for
+  // logging purposes and will not be interpreted by the server.
+  optional string client_type = 4;
+
+  ReleaseType release_type = 5;
+
+  enum ReleaseType {

Review Comment:
   If you have a parametrized enum that uses maybe message types why not model it directly as an enum? ReleaseAll ans ReleaseUntil



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala:
##########
@@ -154,11 +206,27 @@ private[connect] class ExecuteResponseObserver[T](val executeHolder: ExecuteHold
   /**
    * Remove cached responses after response with lastReturnedIndex is returned from getResponse.
    * Remove according to caching policy:
-   *   - if query is not reattachable, remove all responses up to and including
-   *     highestConsumedIndex.
+   *   - if retryBufferSize is 0 (or query is not reattachable), remove all responses up to and
+   *     including lastSentIndex.
+   *   - otherwise keep responses backwards from lastSentIndex until their total size exceeds
+   *     retryBufferSize
    */
-  private def removeCachedResponses() = {
-    var i = highestConsumedIndex
+  private def removeCachedResponses(lastSentIndex: Long) = {
+    var i = lastSentIndex
+    var totalResponsesSize = 0L
+    while (i >= 1 && responses.get(i).isDefined && totalResponsesSize < retryBufferSize) {
+      totalResponsesSize += responses.get(i).get.serializedByteSize

Review Comment:
   Is the total response size only update when we remove the items?



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -666,6 +680,108 @@ message InterruptResponse {
   repeated string interrupted_ids = 2;
 }
 
+message ReattachOptions {
+  // If true, the request can be reattached to using ReattachExecute.
+  // ReattachExecute can be used either if the stream broke with a GRPC network error,
+  // or if the server closed the stream without sending a response with StreamStatus.complete=true.
+  // The server will keep a buffer of responses in case a response is lost, and
+  // ReattachExecute needs to back-track.
+  //
+  // If false, the execution response stream will will not be reattachable, and all responses are
+  // immediately released by the server after being sent.
+  bool reattachable = 1;
+}
+
+message ReattachExecuteRequest {
+  // (Required)
+  //
+  // The session_id of the request to reattach to.
+  // This must be an id of existing session.
+  string session_id = 1;
+
+  // (Required) User context
+  //
+  // user_context.user_id and session+id both identify a unique remote spark session on the
+  // server side.
+  UserContext user_context = 2;
+
+  // (Required)
+  // Provide an id of the request to reattach to.
+  // This must be an id of existing operation.
+  string operation_id = 3;
+
+  // Provides optional information about the client sending the request. This field
+  // can be used for language or version specific information and is only intended for
+  // logging purposes and will not be interpreted by the server.
+  optional string client_type = 4;
+
+  // (Optional)
+  // Last already processed response id from the response stream.
+  // After reattach, server will resume the response stream after that response.
+  // If not specified, server will restart the stream from the start.
+  //
+  // Note: server controls the amount of responses that it buffers and it may drop responses,
+  // that are far behind the latest returned response, so this can't be used to arbitrarily
+  // scroll back the cursor. If the response is no longer available, this will result in an error.
+  optional string last_response_id = 5;
+}
+
+message ReleaseExecuteRequest {
+  // (Required)
+  //
+  // The session_id of the request to reattach to.
+  // This must be an id of existing session.
+  string session_id = 1;
+
+  // (Required) User context
+  //
+  // user_context.user_id and session+id both identify a unique remote spark session on the
+  // server side.
+  UserContext user_context = 2;
+
+  // (Required)
+  // Provide an id of the request to reattach to.
+  // This must be an id of existing operation.
+  string operation_id = 3;
+
+  // Provides optional information about the client sending the request. This field
+  // can be used for language or version specific information and is only intended for
+  // logging purposes and will not be interpreted by the server.
+  optional string client_type = 4;
+
+  ReleaseType release_type = 5;
+
+  enum ReleaseType {

Review Comment:
   This gets you around the issue it unspecified as well



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala:
##########
@@ -46,72 +57,118 @@ private[connect] class ExecuteGrpcResponseSender[T](grpcObserver: StreamObserver
 
   /**
    * Attach to the executionObserver, consume responses from it, and send them to grpcObserver.
+   *
+   * In non reattachable execution, it will keep sending responses until the query finishes. In
+   * reattachable execution, it can finish earlier after reaching a time deadline or size limit.
+   *
+   * After this function finishes, the grpcObserver is closed with either onCompleted or onError.
+   *
    * @param lastConsumedStreamIndex
    *   the last index that was already consumed and sent. This sender will start from index after
    *   that. 0 means start from beginning (since first response has index 1)
-   *
-   * @return
-   *   true if the execution was detached before stream completed. The caller needs to finish the
-   *   grpcObserver stream false if stream was finished. In this case, grpcObserver stream is
-   *   already completed.
    */
-  def run(
-      executionObserver: ExecuteResponseObserver[T],
-      lastConsumedStreamIndex: Long): Boolean = {
+  def run(lastConsumedStreamIndex: Long): Unit = {
+    logDebug(
+      s"GrpcResponseSender run for $executeHolder, " +
+        s"reattachable=${executeHolder.reattachable}, " +
+        s"lastConsumedStreamIndex=$lastConsumedStreamIndex")
+
     // register to be notified about available responses.
     executionObserver.attachConsumer(this)
 
     var nextIndex = lastConsumedStreamIndex + 1
     var finished = false
 
+    // Time at which this sender should finish if the response stream is not finished by then.
+    val deadlineTimeMillis = if (!executeHolder.reattachable) {
+      Long.MaxValue
+    } else {
+      val confSize =
+        SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION).toLong
+      if (confSize > 0) System.currentTimeMillis() + 1000 * confSize else Long.MaxValue

Review Comment:
   Does this not return a duration or something similar to avoid the manual conversion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42228: [SPARK-44421][CONNECT] Reattachable execution in Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42228:
URL: https://github.com/apache/spark/pull/42228#discussion_r1279766856


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -1447,14 +1475,29 @@
       "The handle <handle> is invalid."
     ],
     "subClass" : {
-      "ALREADY_EXISTS" : {

Review Comment:
   Yes, added in https://github.com/apache/spark/pull/42009/files#diff-74b78a86e87e47c520c1183be7ec8b5220378f123e47daad7f9e4cdd1a66336cR1386 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42228: [SPARK-44421][CONNECT] Reattachable execution in Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42228:
URL: https://github.com/apache/spark/pull/42228#discussion_r1279885210


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -62,6 +64,41 @@ object Connect {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefault(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE)
 
+  val CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION =
+    ConfigBuilder("spark.connect.execute.reattachable.senderMaxStreamDuration")
+      .internal()
+      .doc("For reattachable execution, after this amount of time the response stream will be " +
+        "automatically completed and client needs to send a new ReattachExecute RPC to continue. " +

Review Comment:
   the completed here comes from server doing an onComplete on the stream... couldn't find a better word either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42228: [SPARK-44421][CONNECT] Reattachable execution in Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42228:
URL: https://github.com/apache/spark/pull/42228#discussion_r1279752760


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.util.UUID
+
+import scala.util.control.NonFatal
+
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+
+class ExecutePlanResponseReattachableIterator(

Review Comment:
   yes, the whole client side is missing docs yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42228: [SPARK-44421][CONNECT] Reattachable execution in Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42228:
URL: https://github.com/apache/spark/pull/42228#discussion_r1278865908


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.util.UUID
+
+import scala.util.control.NonFatal
+
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+
+class ExecutePlanResponseReattachableIterator(
+    request: proto.ExecutePlanRequest,
+    channel: ManagedChannel,
+    retryPolicy: GrpcRetryHandler.RetryPolicy)
+    extends java.util.Iterator[proto.ExecutePlanResponse]
+    with Logging {
+
+  val operationId = UUID.randomUUID.toString
+
+  // We don't want retry handling or error conversion done by the custom stubs.
+  private val rawBlockingStub = proto.SparkConnectServiceGrpc.newBlockingStub(channel)
+  private val rawAsyncStub = proto.SparkConnectServiceGrpc.newStub(channel)
+
+  private val initialRequest: proto.ExecutePlanRequest = request
+    .toBuilder()
+    .addRequestOptions(
+      proto.ExecutePlanRequest.RequestOption.newBuilder()
+        .setReattachOptions(proto.ReattachOptions.newBuilder().setReattachable(true).build())
+        .build())
+    .setOperationId(operationId)
+    .build()
+
+  private var lastResponseId: Option[String] = None
+  private var responseComplete: Boolean = false
+  private var iterator: java.util.Iterator[proto.ExecutePlanResponse] =
+    rawBlockingStub.executePlan(initialRequest)
+
+  override def next(): proto.ExecutePlanResponse = synchronized {
+    hasNext() // will trigger reattach in case the stream completed without responseComplete
+
+    // Get next response, possibly triggering reattach in case of stream error.
+    var firstTry = true
+    val ret = retry {
+      if (firstTry) {
+        // on first try, we use the initial iterator.
+        firstTry = false
+      } else {
+        // Error retry reattach: After an error, attempt to
+        iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+      }
+      iterator.next()
+    }
+
+    // Record last returned response, to know where to restart in case of reattach.
+    lastResponseId = Some(ret.getResponseId)
+    if (ret.hasResponseComplete) {
+      responseComplete = true
+      releaseExecute(None) // release all
+    } else {
+      releaseExecute(lastResponseId) // release until this response
+    }
+    ret
+  }
+
+  override def hasNext(): Boolean = synchronized {
+    if (responseComplete) {
+      // After response complete response
+      return false
+    }
+    retry {
+      var hasNext = iterator.hasNext()
+      // Graceful reattach:
+      // If iterator ended, but there was no ResponseComplete, it means that there is more,
+      // and we need to reattach. While ResponseComplete didn't arrive, we keep reattaching.
+      if (!hasNext && !responseComplete) {
+        while (!hasNext) {
+          iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+          assert(!responseComplete) // shouldn't change...
+          hasNext = iterator.hasNext()
+        }
+      }
+      hasNext
+    }
+  }
+
+  private def releaseExecute(untilResponseId: Option[String]) = {
+    val request = createReleaseExecuteRequest(untilResponseId)
+    rawAsyncStub.releaseExecute(
+      request,
+      createRetryingReleaseExecuteResponseObserer(request)
+    )
+  }
+
+  private def createReattachExecuteRequest() = {
+    val reattach = proto.ReattachExecuteRequest.newBuilder()
+      .setSessionId(initialRequest.getSessionId)
+      .setUserContext(initialRequest.getUserContext)
+      .setOperationId(initialRequest.getOperationId)
+
+    if (initialRequest.hasClientType) {
+      reattach.setClientType(initialRequest.getClientType)
+    }
+
+    if (lastResponseId.isDefined) {
+      reattach.setLastResponseId(lastResponseId.get)
+    }
+    reattach.build()
+  }
+
+  private def createReleaseExecuteRequest(untilResponseId: Option[String]) = {
+    val release = proto.ReleaseExecuteRequest.newBuilder()
+      .setSessionId(initialRequest.getSessionId)
+      .setUserContext(initialRequest.getUserContext)
+      .setOperationId(initialRequest.getOperationId)
+
+    if (initialRequest.hasClientType) {
+      release.setClientType(initialRequest.getClientType)
+    }
+
+    untilResponseId match {
+      case None =>
+        release.setReleaseType(proto.ReleaseExecuteRequest.ReleaseType.RELEASE_ALL)
+      case Some(responseId) =>
+        release.setReleaseType(proto.ReleaseExecuteRequest.ReleaseType.RELEASE_UNTIL_RESPONSE)
+        release.setUntilResponseId(responseId)
+    }
+
+    release.build()
+  }
+
+  private def createRetryingReleaseExecuteResponseObserer(
+    requestForRetry: proto.ReleaseExecuteRequest, currentRetryNum: Int = 0)
+    : StreamObserver[proto.ReleaseExecuteResponse] = {
+    new StreamObserver[proto.ReleaseExecuteResponse] {
+      override def onNext(v: proto.ReleaseExecuteResponse): Unit = {}
+      override def onCompleted(): Unit = {}
+      override def onError(t: Throwable): Unit = t match {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          Thread.sleep(
+            (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+              .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+          rawAsyncStub.releaseExecute(requestForRetry,
+            createRetryingReleaseExecuteResponseObserer(requestForRetry, currentRetryNum + 1))

Review Comment:
   qq why do we do this with `StreamObserver`? can't we just retry with `retry` you defined below?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on pull request #42228: [SPARK-44421][SPARK-44423][CONNECT] Reattachable execution in Spark Connect

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #42228:
URL: https://github.com/apache/spark/pull/42228#issuecomment-1659549496

   Hi, this seems to break 3.5 build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42228: [SPARK-44421][CONNECT] Reattachable execution in Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42228:
URL: https://github.com/apache/spark/pull/42228#discussion_r1279755066


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -666,6 +680,108 @@ message InterruptResponse {
   repeated string interrupted_ids = 2;
 }
 
+message ReattachOptions {
+  // If true, the request can be reattached to using ReattachExecute.
+  // ReattachExecute can be used either if the stream broke with a GRPC network error,
+  // or if the server closed the stream without sending a response with StreamStatus.complete=true.
+  // The server will keep a buffer of responses in case a response is lost, and
+  // ReattachExecute needs to back-track.
+  //
+  // If false, the execution response stream will will not be reattachable, and all responses are
+  // immediately released by the server after being sent.
+  bool reattachable = 1;
+}
+
+message ReattachExecuteRequest {
+  // (Required)
+  //
+  // The session_id of the request to reattach to.
+  // This must be an id of existing session.
+  string session_id = 1;
+
+  // (Required) User context
+  //
+  // user_context.user_id and session+id both identify a unique remote spark session on the
+  // server side.
+  UserContext user_context = 2;
+
+  // (Required)
+  // Provide an id of the request to reattach to.
+  // This must be an id of existing operation.
+  string operation_id = 3;
+
+  // Provides optional information about the client sending the request. This field
+  // can be used for language or version specific information and is only intended for
+  // logging purposes and will not be interpreted by the server.
+  optional string client_type = 4;
+
+  // (Optional)
+  // Last already processed response id from the response stream.
+  // After reattach, server will resume the response stream after that response.
+  // If not specified, server will restart the stream from the start.
+  //
+  // Note: server controls the amount of responses that it buffers and it may drop responses,
+  // that are far behind the latest returned response, so this can't be used to arbitrarily
+  // scroll back the cursor. If the response is no longer available, this will result in an error.
+  optional string last_response_id = 5;
+}
+
+message ReleaseExecuteRequest {
+  // (Required)
+  //
+  // The session_id of the request to reattach to.
+  // This must be an id of existing session.
+  string session_id = 1;
+
+  // (Required) User context
+  //
+  // user_context.user_id and session+id both identify a unique remote spark session on the
+  // server side.
+  UserContext user_context = 2;
+
+  // (Required)
+  // Provide an id of the request to reattach to.
+  // This must be an id of existing operation.
+  string operation_id = 3;
+
+  // Provides optional information about the client sending the request. This field
+  // can be used for language or version specific information and is only intended for
+  // logging purposes and will not be interpreted by the server.
+  optional string client_type = 4;
+
+  ReleaseType release_type = 5;
+
+  enum ReleaseType {

Review Comment:
   you mean
   ```
   oneof release_type {
     ReleaseAll release_all = 11;
     ReleaseUntil release_until  =12;
   }
   
   message ReleaseAll;
   message ReleaseUntil {
     string response_id = 1;
   }
   ```
   ?
   I kinda wished that I did Interrupt like that in the first place, but then did this the same like I did interrupt.
   But could change this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42228: [SPARK-44421][SPARK-44423][CONNECT] Reattachable execution in Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42228:
URL: https://github.com/apache/spark/pull/42228#discussion_r1280222878


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.util.UUID
+
+import scala.util.control.NonFatal
+
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+
+/**
+ * Retryable iterator of ExecutePlanResponses to an ExecutePlan call.
+ *
+ * It can handle situations when:
+ *   - the ExecutePlanResponse stream was broken by retryable network error (governed by
+ *     retryPolicy)
+ *   - the ExecutePlanResponse was gracefully ended by the server without a ResultComplete
+ *     message; this tells the client that there is more, and it should reattach to continue.
+ *
+ * Initial iterator is the result of an ExecutePlan on the request, but it can be reattached with
+ * ReattachExecute request. ReattachExecute request is provided the responseId of last returned
+ * ExecutePlanResponse on the iterator to return a new iterator from server that continues after
+ * that.
+ *
+ * Since in reattachable execute the server does buffer some responses in case the client needs to
+ * backtrack
+ */
+class ExecutePlanResponseReattachableIterator(
+    request: proto.ExecutePlanRequest,
+    channel: ManagedChannel,
+    retryPolicy: GrpcRetryHandler.RetryPolicy)
+    extends java.util.Iterator[proto.ExecutePlanResponse]
+    with Logging {
+
+  val operationId = if (request.hasOperationId) {
+    request.getOperationId
+  } else {
+    // Add operation id, if not present.
+    // with operationId set by the client, the client can use it to try to reattach on error
+    // even before getting the first response. If the operation in fact didn't even reach the
+    // server, that will end with INVALID_HANDLE.OPERATION_NOT_FOUND error.
+    UUID.randomUUID.toString
+  }
+
+  // Need raw stubs, don't want retry handling or error conversion done by the custom stubs.
+  // - this does it's own custom retry handling
+  // - error conversion is wrapped around this in CustomSparkConnectBlockingStub,
+  //   this needs raw GRPC errors for retries.
+  private val rawBlockingStub = proto.SparkConnectServiceGrpc.newBlockingStub(channel)
+  private val rawAsyncStub = proto.SparkConnectServiceGrpc.newStub(channel)
+
+  private val initialRequest: proto.ExecutePlanRequest = request
+    .toBuilder()
+    .addRequestOptions(
+      proto.ExecutePlanRequest.RequestOption
+        .newBuilder()
+        .setReattachOptions(proto.ReattachOptions.newBuilder().setReattachable(true).build())
+        .build())
+    .setOperationId(operationId)
+    .build()
+
+  // ResponseId of the last response returned by next()
+  private var lastReturnedResponseId: Option[String] = None
+
+  // True after ResponseComplete message was seen in the stream.
+  // Server will always send this message at the end of the stream, if the underlying iterator
+  // finishes without producing one, another iterator needs to be reattached.
+  private var responseComplete: Boolean = false
+
+  // Initial iterator comes from ExecutePlan request.
+  private var iterator: java.util.Iterator[proto.ExecutePlanResponse] =
+    rawBlockingStub.executePlan(initialRequest)
+
+  override def next(): proto.ExecutePlanResponse = synchronized {
+    // hasNext will trigger reattach in case the stream completed without responseComplete
+    if (!hasNext()) {
+      throw new java.util.NoSuchElementException()
+    }
+
+    // Get next response, possibly triggering reattach in case of stream error.
+    var firstTry = true
+    val ret = retry {
+      if (firstTry) {
+        // on first try, we use the existing iterator.
+        firstTry = false
+      } else {
+        // on retry, the iterator is borked, so we need a new one
+        iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+      }
+      iterator.next()

Review Comment:
   qq: can this iterator empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org