You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "juliuszsompolski (via GitHub)" <gi...@apache.org> on 2023/05/25 14:43:18 UTC

[GitHub] [spark] juliuszsompolski opened a new pull request, #41315: [SPARK-43755]

juliuszsompolski opened a new pull request, #41315:
URL: https://github.com/apache/spark/pull/41315

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1260932187


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None
+
+  def onNext(r: ExecutePlanResponse): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onNext can't be called after stream completed")
+    }
+    index += 1
+    responses += CachedExecutePlanResponse(r, index)
+    logDebug(s"Saved response with index=$index")
+    notifyAll()
+  }
+
+  def onError(t: Throwable): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onError can't be called after stream completed")
+    }
+    error = Some(t)
+    lastIndex = Some(index) // no responses to be send after error.
+    logDebug(s"Error. Last stream index is $index.")
+    notifyAll()
+  }
+
+  def onCompleted(): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onCompleted can't be called after stream completed")
+    }
+    lastIndex = Some(index)
+    logDebug(s"Completed. Last stream index is $index.")
+    notifyAll()
+  }
+
+  /** Set a new response sender. */
+  def setExecutePlanResponseSender(newSender: ExecutePlanResponseSender): Unit = synchronized {
+    // detach the current sender before attaching new one
+    // this.synchronized() needs to be held while detaching a sender, and the detached sender
+    // needs to be notified with notifyAll() afterwards.
+    responseSender.foreach(_.detach())
+    responseSender = Some(newSender)
+    notifyAll()
+  }
+
+  /** Remove cached responses until index */
+  def removeUntilIndex(index: Long): Unit = synchronized {
+    while (responses.nonEmpty && responses(0).index <= index) {
+      responses.remove(0)
+    }
+    logDebug(s"Removed saved responses until index $index.")
+  }
+
+  /** Get response with a given index in the stream, if set. */
+  def getResponse(index: Long): Option[CachedExecutePlanResponse] = synchronized {
+    if (responses.nonEmpty && (index - responses(0).index).toInt < responses.size) {
+      // Note: index access in ListBuffer is linear; we assume here the buffer is not too long.
+      val ret = responses((index - responses(0).index).toInt)
+      assert(ret.index == index)
+      Some(ret)
+    } else {
+      None
+    }
+  }
+
+  /** Get the stream error, if set.  */
+  def getError(): Option[Throwable] = synchronized {
+    error
+  }
+
+  /** If the stream is finished, the index of the last response, otherwise unset. */
+  def getLastIndex(): Option[Long] = synchronized {
+    lastIndex

Review Comment:
   A thing that gets confusing there is conflating the "index" of the response, with the "index" in the listbuffer. Because we are removing cached responses from the front of the list, these are not the same. Any mention of "index" in this file refers to CachedExecutePlanResponse.index; the listbuffer and index within it is an internal implementation detail. Will try to make it more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1260922955


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver() extends StreamObserver[ExecutePlanResponse] {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var completed: Boolean = false
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None
+
+  def onNext(r: ExecutePlanResponse): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onNext can't be called after stream completed")
+    }
+    index += 1
+    responses += CachedExecutePlanResponse(r, index)
+    notifySender()
+  }
+
+  def onError(t: Throwable): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onError can't be called after stream completed")
+    }
+    error = Some(t)

Review Comment:
   Adding to what we were talking:
   * This picks up errors of ExecutePlan, but it wouldn't be hard to extend this execution mechanism to other RPCs.
   * However, this will pick up errors on the execution thread only. If errors happen on the grpc thread (these would be more internal errors like unable to start execution thread. So those would need a different mechanism.
   
   TBH, if I were to tabula-rasa design it, I would not use GRPC errors for application errors (like errors for Spark execution), but have these returned as an explicit message type, and reserve GRPC onError for server errors. That would free us from GRPC onError size limitations that we now need such workarounds for. That would make it easier to distinguish network/framework errors from user errors, which would make it easier to establish retry policies, or to keep stats on user errors vs. system errors... But such change couldn't be done backwards compatible at this point. Maybe for Spark 4.0?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "xuanyuanking (via GitHub)" <gi...@apache.org>.
xuanyuanking commented on PR #41315:
URL: https://github.com/apache/spark/pull/41315#issuecomment-1636127311

   Thanks! Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1260969980


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None
+
+  def onNext(r: ExecutePlanResponse): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onNext can't be called after stream completed")
+    }
+    index += 1
+    responses += CachedExecutePlanResponse(r, index)
+    logDebug(s"Saved response with index=$index")
+    notifyAll()
+  }
+
+  def onError(t: Throwable): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onError can't be called after stream completed")
+    }
+    error = Some(t)
+    lastIndex = Some(index) // no responses to be send after error.
+    logDebug(s"Error. Last stream index is $index.")
+    notifyAll()
+  }
+
+  def onCompleted(): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onCompleted can't be called after stream completed")
+    }
+    lastIndex = Some(index)
+    logDebug(s"Completed. Last stream index is $index.")
+    notifyAll()
+  }
+
+  /** Set a new response sender. */
+  def setExecutePlanResponseSender(newSender: ExecutePlanResponseSender): Unit = synchronized {
+    // detach the current sender before attaching new one
+    // this.synchronized() needs to be held while detaching a sender, and the detached sender
+    // needs to be notified with notifyAll() afterwards.
+    responseSender.foreach(_.detach())
+    responseSender = Some(newSender)
+    notifyAll()
+  }
+
+  /** Remove cached responses until index */
+  def removeUntilIndex(index: Long): Unit = synchronized {
+    while (responses.nonEmpty && responses(0).index <= index) {
+      responses.remove(0)
+    }
+    logDebug(s"Removed saved responses until index $index.")
+  }
+
+  /** Get response with a given index in the stream, if set. */
+  def getResponse(index: Long): Option[CachedExecutePlanResponse] = synchronized {
+    if (responses.nonEmpty && (index - responses(0).index).toInt < responses.size) {
+      // Note: index access in ListBuffer is linear; we assume here the buffer is not too long.
+      val ret = responses((index - responses(0).index).toInt)
+      assert(ret.index == index)
+      Some(ret)
+    } else {
+      None
+    }
+  }
+
+  /** Get the stream error, if set.  */
+  def getError(): Option[Throwable] = synchronized {
+    error
+  }
+
+  /** If the stream is finished, the index of the last response, otherwise unset. */
+  def getLastIndex(): Option[Long] = synchronized {
+    lastIndex
+  }
+
+  def completed(): Boolean = synchronized {
+    lastIndex.isDefined

Review Comment:
   ditto lastIndex != index in the responses list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1260969507


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None
+
+  def onNext(r: ExecutePlanResponse): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onNext can't be called after stream completed")
+    }
+    index += 1
+    responses += CachedExecutePlanResponse(r, index)
+    logDebug(s"Saved response with index=$index")
+    notifyAll()
+  }
+
+  def onError(t: Throwable): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onError can't be called after stream completed")
+    }
+    error = Some(t)
+    lastIndex = Some(index) // no responses to be send after error.
+    logDebug(s"Error. Last stream index is $index.")
+    notifyAll()
+  }
+
+  def onCompleted(): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onCompleted can't be called after stream completed")
+    }
+    lastIndex = Some(index)
+    logDebug(s"Completed. Last stream index is $index.")
+    notifyAll()
+  }
+
+  /** Set a new response sender. */
+  def setExecutePlanResponseSender(newSender: ExecutePlanResponseSender): Unit = synchronized {
+    // detach the current sender before attaching new one
+    // this.synchronized() needs to be held while detaching a sender, and the detached sender
+    // needs to be notified with notifyAll() afterwards.
+    responseSender.foreach(_.detach())
+    responseSender = Some(newSender)
+    notifyAll()
+  }
+
+  /** Remove cached responses until index */
+  def removeUntilIndex(index: Long): Unit = synchronized {
+    while (responses.nonEmpty && responses(0).index <= index) {
+      responses.remove(0)
+    }
+    logDebug(s"Removed saved responses until index $index.")
+  }
+
+  /** Get response with a given index in the stream, if set. */
+  def getResponse(index: Long): Option[CachedExecutePlanResponse] = synchronized {
+    if (responses.nonEmpty && (index - responses(0).index).toInt < responses.size) {
+      // Note: index access in ListBuffer is linear; we assume here the buffer is not too long.
+      val ret = responses((index - responses(0).index).toInt)
+      assert(ret.index == index)
+      Some(ret)
+    } else {
+      None

Review Comment:
   it's used by the Sender to probe if a response with given index is available, so getting None here is a validly returned response here (after which Sender goes to wait() to get notified after a new response becomes available)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on PR #41315:
URL: https://github.com/apache/spark/pull/41315#issuecomment-1563281104

   @hvanhovell @grundprinzip @LuciferYang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1213565180


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
+
+  val jobGroupId =
+    s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
+
+  val session = sessionHolder.session
+
+  var executePlanRequest: Option[proto.ExecutePlanRequest] = None
+
+  var executePlanResponseObserver: Option[ExecutePlanResponseObserver] = None
+
+  private var executionThread: Thread = null
+
+  private var executionError: Option[Throwable] = None
+
+  private var interrupted: Boolean = false
+
+  def run(
+      request: proto.ExecutePlanRequest,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    // Set the state of what needs to be run.
+    this.executePlanRequest = Some(request)
+    this.executePlanResponseObserver = Some(new ExecutePlanResponseObserver(responseObserver))
+    // And start the execution.
+    startExecute()
+  }
+
+  protected def startExecute(): Unit = {
+    // synchronized in case of interrupt while starting.
+    synchronized {
+      // The newly created thread will inherit all InheritableThreadLocals used by Spark,
+      // e.g. SparkContext.localProperties./ If considering implementing a threadpool,
+      // forwarding of thread locals needs to be taken into account.
+      this.executionThread = new Thread() {
+        override def run(): Unit = {
+          execute()
+        }
+      }
+    }
+
+    try {
+      // Start execution thread..
+      this.executionThread.start()
+      // ... and wait for execution thread to finish.
+      // TODO: Detach execution from RPC request. Then this can return early, and results
+      // are served to the client via additional RPCs from ExecutePlanResponseObserver.
+      this.executionThread.join()
+
+      executionError.foreach { error =>
+        logDebug(s"executionError: ${error}")
+        throw error
+      }
+    } catch {
+      case NonFatal(e) =>
+        // In case of exception happening on the handler thread, interrupt the underlying execution.
+        this.interrupt()
+        throw e
+    }
+  }
+
+  protected def execute() = SparkConnectArtifactManager.withArtifactClassLoader {
+    try {
+      // synchronized - check if already got interrupted while starting.
+      synchronized {
+        if (interrupted) {
+          throw new InterruptedException()
+        }
+      }
+
+      session.withActive {
+        val debugString = requestString(executePlanRequest.get)
+
+        session.sparkContext.setJobGroup(
+          jobGroupId,
+          s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}",
+          interruptOnCancel = true)
+
+        // Add debug information to the query execution so that the jobs are traceable.
+        session.sparkContext.setLocalProperty(
+          "callSite.short",
+          s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}")
+        session.sparkContext.setLocalProperty(
+          "callSite.long",
+          StringUtils.abbreviate(debugString, 2048))
+
+        executePlanRequest.foreach { request =>
+          request.getPlan.getOpTypeCase match {
+            case proto.Plan.OpTypeCase.COMMAND => handleCommand(request)
+            case proto.Plan.OpTypeCase.ROOT => handlePlan(request)
+            case _ =>
+              throw new UnsupportedOperationException(
+                s"${request.getPlan.getOpTypeCase} not supported.")
+          }
+        }
+      }
+    } catch {
+      // Actually do need to catch Throwable as some failures don't inherit from Exception and
+      // HiveServer will silently swallow them.

Review Comment:
   HiveServer? What is that :P....



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1261445799


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -33,26 +32,26 @@ import org.apache.spark.sql.SparkSession
 case class SessionHolder(userId: String, sessionId: String, session: SparkSession)
     extends Logging {
 
-  val executePlanOperations: ConcurrentMap[String, ExecutePlanHolder] =
-    new ConcurrentHashMap[String, ExecutePlanHolder]()
+  val executions: ConcurrentMap[String, ExecutionHolder] =
+    new ConcurrentHashMap[String, ExecutionHolder]()
 
-  private[connect] def createExecutePlanHolder(
-      request: proto.ExecutePlanRequest): ExecutePlanHolder = {
+  private[connect] def createExecutionHolder(): ExecutionHolder = {
 
     val operationId = UUID.randomUUID().toString
-    val executePlanHolder = ExecutePlanHolder(operationId, this, request)
-    assert(executePlanOperations.putIfAbsent(operationId, executePlanHolder) == null)
+    val executePlanHolder = ExecutionHolder(operationId, this)
+    assert(executions.putIfAbsent(operationId, executePlanHolder) == null)

Review Comment:
   we are removing asserts from prod code. However they still work in testing.
   I personally would also prefer if we had asserts in prod, but Spark doesn't seem to do that in general...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on PR #41315:
URL: https://github.com/apache/spark/pull/41315#issuecomment-1564663576

   ```
   [info] *** 1 TEST FAILED ***
   [error] Failed: Total 3649, Failed 1, Errors 0, Passed 3648, Ignored 10, Canceled 2
   [error] Failed tests:
   [error] 	org.apache.spark.storage.BlockManagerProactiveReplicationSuite
   ```
   unrelated flake.
   Will wait with retriggering CI for review comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1205839099


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
+
+  val jobGroupId =
+    s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
+
+  val session = sessionHolder.session
+
+  var executePlanRequest: Option[proto.ExecutePlanRequest] = None
+
+  var executePlanResponseObserver: Option[ExecutePlanResponseObserver] = None

Review Comment:
   I renamed it, and made executePlanRequest / executePlanResponseObserver options here, because I think this ExecutionHolder could be useful for holding other executions than ExecutePlanRequest.
   E.g. we might want to execute requests from AnalyzePlanRequest using this as well...



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.ByteString
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
+import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.connect.service.ExecutionHolder
+import org.apache.spark.sql.execution.{LocalTableScanExec, SQLExecution}
+import org.apache.spark.sql.execution.arrow.ArrowConverters
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Handle ExecutePlanRequest where the operatoin to handle is Plan execution of type
+ * proto.Plan.OpTypeCase.ROOT
+ * @param executionHolder
+ */
+class SparkConnectPlanExecution(executionHolder: ExecutionHolder) {

Review Comment:
   This class is no-changes verbatim moved out of SparkConnectStreamHandler.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -15,27 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.connect.service
+package org.apache.spark.sql.connect.execution
 
-import org.apache.spark.connect.proto
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
 
 /**
- * Object used to hold the Spark Connect execution state.
+ * Container for responses to Execution.
+ * TODO: At this moment, it simply forwards response to the underlying GRPC StreamObserver.
+ *       Detaching execution from the RPC handler, this can be used to store the responses,
+ *       and then send them to different GRPC StreamObservers.
+ *
+ * @param responseObserver
  */
-case class ExecutePlanHolder(
-    operationId: String,
-    sessionHolder: SessionHolder,
-    request: proto.ExecutePlanRequest) {
+class ExecutePlanResponseObserver(responseObserver: StreamObserver[ExecutePlanResponse])
+  extends StreamObserver[ExecutePlanResponse] {

Review Comment:
   This is a bit of a stub for now, but wanted to put it in already, so I don't need to move code around in followups.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/executionUtils.scala:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec}
+
+/**
+ * Helper object for generating responses with metrics from queries.
+ */
+object MetricGenerator extends AdaptiveSparkPlanHelper {

Review Comment:
   This is practically no-changes moved out of SparkConnectStreamHandler.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
+
+  val jobGroupId =
+    s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
+
+  val session = sessionHolder.session
+
+  var executePlanRequest: Option[proto.ExecutePlanRequest] = None
+
+  var executePlanResponseObserver: Option[ExecutePlanResponseObserver] = None
+
+  private var executionThread: Thread = null
+
+  private var executionError: Option[Throwable] = None
+
+  private var interrupted: Boolean = false
+
+  def run(
+      request: proto.ExecutePlanRequest,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    // Set the state of what needs to be run.
+    this.executePlanRequest = Some(request)
+    this.executePlanResponseObserver = Some(new ExecutePlanResponseObserver(responseObserver))
+    // And start the execution.
+    startExecute()
+  }
+
+  protected def startExecute(): Unit = {
+    // synchronized in case of interrupt while starting.
+    synchronized {
+      // The newly created thread will inherit all InheritableThreadLocals used by Spark,
+      // e.g. SparkContext.localProperties./ If considering implementing a threadpool,
+      // forwarding of thread locals needs to be taken into account.
+      this.executionThread = new Thread() {
+        override def run(): Unit = {
+          execute()
+        }
+      }
+    }
+
+    try {
+      // Start execution thread..
+      this.executionThread.start()
+      // ... and wait for execution thread to finish.
+      // TODO: Detach execution from RPC request. Then this can return early, and results
+      // are served to the client via additional RPCs from ExecutePlanResponseObserver.
+      this.executionThread.join()
+
+      executionError.foreach { error =>
+        logDebug(s"executionError: ${error}")
+        throw error
+      }
+    } catch {
+      case NonFatal(e) =>
+        // In case of exception happening on the handler thread, interrupt the underlying execution.
+        this.interrupt()
+        throw e
+    }
+  }
+
+  protected def execute() = SparkConnectArtifactManager.withArtifactClassLoader {
+    try {
+      // synchronized - check if already got interrupted while starting.
+      synchronized {
+        if (interrupted) {
+          throw new InterruptedException()
+        }
+      }
+
+      session.withActive {
+        val debugString = requestString(executePlanRequest.get)
+
+        session.sparkContext.setJobGroup(
+          jobGroupId,
+          s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}",
+          interruptOnCancel = true)
+
+        // Add debug information to the query execution so that the jobs are traceable.
+        session.sparkContext.setLocalProperty(
+          "callSite.short",
+          s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}")
+        session.sparkContext.setLocalProperty(
+          "callSite.long",
+          StringUtils.abbreviate(debugString, 2048))
+
+        executePlanRequest.foreach { request =>
+          request.getPlan.getOpTypeCase match {
+            case proto.Plan.OpTypeCase.COMMAND => handleCommand(request)
+            case proto.Plan.OpTypeCase.ROOT => handlePlan(request)
+            case _ =>
+              throw new UnsupportedOperationException(
+                s"${request.getPlan.getOpTypeCase} not supported.")
+          }
+        }
+      }
+    } catch {
+      // Actually do need to catch Throwable as some failures don't inherit from Exception and
+      // HiveServer will silently swallow them.
+      case e: Throwable =>
+        // scalastyle:off
+        logDebug(s"Exception in execute: $e")
+        // Always cancel all remaining execution after error.
+        sessionHolder.session.sparkContext.cancelJobGroup(jobGroupId)
+        executionError = if (interrupted) {
+          // Turn the interrupt into OPERATION_CANCELLED error.
+          Some(new SparkSQLException("OPERATION_CANCELLED", Map.empty))
+        } else {
+          // Return the originally thrown error.
+          Some(e)
+        }
+    } finally {
+      session.sparkContext.clearJobGroup()
+    }
+  }
+
+  def interrupt(): Unit = {
+    synchronized {
+      interrupted = true
+      if (executionThread != null) {
+        executionThread.interrupt()
+      }
+    }
+  }
+
+  private def handlePlan(request: ExecutePlanRequest): Unit = {
+    val request = executePlanRequest.get
+    val responseObserver = executePlanResponseObserver.get
+
+    val execution = new SparkConnectPlanExecution(this)
+    execution.handlePlan(responseObserver)
+  }
+
+  private def handleCommand(request: ExecutePlanRequest): Unit = {
+    val request = executePlanRequest.get
+    val responseObserver = executePlanResponseObserver.get
+
+    val command = request.getPlan.getCommand
+    val planner = new SparkConnectPlanner(sessionHolder.session)

Review Comment:
   I think command execution should be refactored into org.apache.spark.sql.connect.execution.SparkConnectCommandExecution in a followup. It doesn't belong in SparkConnectPlanner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1213562310


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
+
+  val jobGroupId =
+    s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
+
+  val session = sessionHolder.session
+
+  var executePlanRequest: Option[proto.ExecutePlanRequest] = None
+
+  var executePlanResponseObserver: Option[ExecutePlanResponseObserver] = None
+
+  private var executionThread: Thread = null
+
+  private var executionError: Option[Throwable] = None
+
+  private var interrupted: Boolean = false
+
+  def run(
+      request: proto.ExecutePlanRequest,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    // Set the state of what needs to be run.
+    this.executePlanRequest = Some(request)
+    this.executePlanResponseObserver = Some(new ExecutePlanResponseObserver(responseObserver))
+    // And start the execution.
+    startExecute()
+  }
+
+  protected def startExecute(): Unit = {
+    // synchronized in case of interrupt while starting.
+    synchronized {
+      // The newly created thread will inherit all InheritableThreadLocals used by Spark,
+      // e.g. SparkContext.localProperties./ If considering implementing a threadpool,
+      // forwarding of thread locals needs to be taken into account.
+      this.executionThread = new Thread() {
+        override def run(): Unit = {
+          execute()
+        }
+      }
+    }
+
+    try {
+      // Start execution thread..
+      this.executionThread.start()
+      // ... and wait for execution thread to finish.
+      // TODO: Detach execution from RPC request. Then this can return early, and results
+      // are served to the client via additional RPCs from ExecutePlanResponseObserver.
+      this.executionThread.join()
+
+      executionError.foreach { error =>

Review Comment:
   This might need synchronization. I am not sure what is guaranteed by Thread.join here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1213559035


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
+
+  val jobGroupId =
+    s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
+
+  val session = sessionHolder.session
+
+  var executePlanRequest: Option[proto.ExecutePlanRequest] = None
+
+  var executePlanResponseObserver: Option[ExecutePlanResponseObserver] = None

Review Comment:
   private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1213558400


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
+
+  val jobGroupId =
+    s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
+
+  val session = sessionHolder.session
+
+  var executePlanRequest: Option[proto.ExecutePlanRequest] = None

Review Comment:
   private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1260962972


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.

Review Comment:
   `lastIndex` is used as the last `CachedExecutePlanResponse.index`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1263500318


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -51,25 +51,24 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   // Mapping from id to StreamingQueryListener. Used for methods like removeListener() in
   // StreamingQueryManager.
   private lazy val listenerCache: ConcurrentMap[String, StreamingQueryListener] =
-    new ConcurrentHashMap()
-
-  private[connect] def createExecutePlanHolder(
-      request: proto.ExecutePlanRequest): ExecutePlanHolder = {
+  new ConcurrentHashMap()

Review Comment:
   scalafmt failed so I made this fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1263538205


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -51,25 +51,24 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   // Mapping from id to StreamingQueryListener. Used for methods like removeListener() in
   // StreamingQueryManager.
   private lazy val listenerCache: ConcurrentMap[String, StreamingQueryListener] =
-    new ConcurrentHashMap()
-
-  private[connect] def createExecutePlanHolder(
-      request: proto.ExecutePlanRequest): ExecutePlanHolder = {
+  new ConcurrentHashMap()

Review Comment:
   Thank you! I must have eaten this whitespace while resolving the conflict there...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1263105233


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.connect.service.ExecuteHolder
+import org.apache.spark.sql.connect.utils.ErrorUtils
+import org.apache.spark.util.Utils
+
+/**
+ * This class launches the actual execution in an execution thread. The execution pushes the
+ * responses to a ExecuteResponseObserver in executeHolder.
+ */
+private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends Logging {
+
+  // The newly created thread will inherit all InheritableThreadLocals used by Spark,
+  // e.g. SparkContext.localProperties. If considering implementing a threadpool,
+  // forwarding of thread locals needs to be taken into account.
+  private var executionThread: Thread = new ExecutionThread()
+
+  private var interrupted: Boolean = false
+
+  /** Launches the execution in a background thread, returns immediately. */
+  def start(): Unit = {
+    executionThread.start()
+  }
+
+  /** Joins the background execution thread after it is finished. */
+  def join(): Unit = {
+    executionThread.join()
+  }
+
+  /** Interrupt the executing thread. */
+  def interrupt(): Unit = {
+    synchronized {
+      interrupted = true
+      executionThread.interrupt()
+    }
+  }
+
+  private def execute(): Unit = {
+    // Outer execute handles errors.
+    // Separate it from executeInternal to save on indent and improve readability.
+    try {
+      try {
+        executeInternal()
+      } catch {
+        // Need to catch throwable instead of NonFatal, because e.g. InterruptedException is fatal.
+        case e: Throwable =>
+          logDebug(s"Exception in execute: $e")
+          // Always cancel all remaining execution after error.
+          executeHolder.sessionHolder.session.sparkContext.cancelJobsWithTag(executeHolder.jobTag)

Review Comment:
   This doesn't wait and doesn't throw, it's just sends an async ping to DAGScheduler to cancel whatever is left running. See SparkContext.cancelJobsWithTag -> DAGScheduler.cancelJobsWithTag.
   This is "to be safe and clean everything up" like in https://github.com/apache/spark/blob/master/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala#L248 in Thriftserver
   
   I plan to add two more types of cancelation:
   - per-query (requires operation_id, to be introduces in a next PR that also deals with query reattach)
   - per user settable tag (very similar to SparkContext.addJobTag / cancelJobsWithTag, but on a Spark Connet SparkSession



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1263105233


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.connect.service.ExecuteHolder
+import org.apache.spark.sql.connect.utils.ErrorUtils
+import org.apache.spark.util.Utils
+
+/**
+ * This class launches the actual execution in an execution thread. The execution pushes the
+ * responses to a ExecuteResponseObserver in executeHolder.
+ */
+private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends Logging {
+
+  // The newly created thread will inherit all InheritableThreadLocals used by Spark,
+  // e.g. SparkContext.localProperties. If considering implementing a threadpool,
+  // forwarding of thread locals needs to be taken into account.
+  private var executionThread: Thread = new ExecutionThread()
+
+  private var interrupted: Boolean = false
+
+  /** Launches the execution in a background thread, returns immediately. */
+  def start(): Unit = {
+    executionThread.start()
+  }
+
+  /** Joins the background execution thread after it is finished. */
+  def join(): Unit = {
+    executionThread.join()
+  }
+
+  /** Interrupt the executing thread. */
+  def interrupt(): Unit = {
+    synchronized {
+      interrupted = true
+      executionThread.interrupt()
+    }
+  }
+
+  private def execute(): Unit = {
+    // Outer execute handles errors.
+    // Separate it from executeInternal to save on indent and improve readability.
+    try {
+      try {
+        executeInternal()
+      } catch {
+        // Need to catch throwable instead of NonFatal, because e.g. InterruptedException is fatal.
+        case e: Throwable =>
+          logDebug(s"Exception in execute: $e")
+          // Always cancel all remaining execution after error.
+          executeHolder.sessionHolder.session.sparkContext.cancelJobsWithTag(executeHolder.jobTag)

Review Comment:
   This doesn't wait, it's just sends an async ping to DAGScheduler to cancel whatever is left running.
   I do plan to add two more types of cancelation:
   - per-query (requires operation_id, to be introduces in a next PR that also deals with query reattach)
   - per user settable tag (very similar to SparkContext.addJobTag / cancelJobsWithTag, but on a Spark Connet SparkSession



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1260962972


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.

Review Comment:
   `lastIndex` is used as the last `CachedExecutePlanResponse.index` after the stream completes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking closed pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "xuanyuanking (via GitHub)" <gi...@apache.org>.
xuanyuanking closed pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread
URL: https://github.com/apache/spark/pull/41315


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1261443353


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
+
+  val jobGroupId =
+    s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
+
+  val session = sessionHolder.session
+
+  var executePlanRequest: Option[proto.ExecutePlanRequest] = None
+
+  var executePlanResponseObserver: Option[ExecutePlanResponseObserver] = None
+
+  private var executionThread: Thread = null
+
+  private var executionError: Option[Throwable] = None
+
+  private var interrupted: Boolean = false
+
+  def run(
+      request: proto.ExecutePlanRequest,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    // Set the state of what needs to be run.
+    this.executePlanRequest = Some(request)
+    this.executePlanResponseObserver = Some(new ExecutePlanResponseObserver(responseObserver))
+    // And start the execution.
+    startExecute()
+  }
+
+  protected def startExecute(): Unit = {
+    // synchronized in case of interrupt while starting.
+    synchronized {
+      // The newly created thread will inherit all InheritableThreadLocals used by Spark,
+      // e.g. SparkContext.localProperties./ If considering implementing a threadpool,
+      // forwarding of thread locals needs to be taken into account.
+      this.executionThread = new Thread() {
+        override def run(): Unit = {
+          execute()
+        }
+      }
+    }
+
+    try {
+      // Start execution thread..
+      this.executionThread.start()
+      // ... and wait for execution thread to finish.
+      // TODO: Detach execution from RPC request. Then this can return early, and results
+      // are served to the client via additional RPCs from ExecutePlanResponseObserver.
+      this.executionThread.join()
+
+      executionError.foreach { error =>
+        logDebug(s"executionError: ${error}")
+        throw error
+      }
+    } catch {
+      case NonFatal(e) =>
+        // In case of exception happening on the handler thread, interrupt the underlying execution.
+        this.interrupt()

Review Comment:
   no, it was to just in case clean up execution. this was refactored sicne.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1260968207


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None
+
+  def onNext(r: ExecutePlanResponse): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onNext can't be called after stream completed")
+    }
+    index += 1
+    responses += CachedExecutePlanResponse(r, index)
+    logDebug(s"Saved response with index=$index")
+    notifyAll()
+  }
+
+  def onError(t: Throwable): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onError can't be called after stream completed")
+    }
+    error = Some(t)
+    lastIndex = Some(index) // no responses to be send after error.
+    logDebug(s"Error. Last stream index is $index.")
+    notifyAll()
+  }
+
+  def onCompleted(): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onCompleted can't be called after stream completed")
+    }
+    lastIndex = Some(index)
+    logDebug(s"Completed. Last stream index is $index.")
+    notifyAll()
+  }
+
+  /** Set a new response sender. */
+  def setExecutePlanResponseSender(newSender: ExecutePlanResponseSender): Unit = synchronized {
+    // detach the current sender before attaching new one
+    // this.synchronized() needs to be held while detaching a sender, and the detached sender
+    // needs to be notified with notifyAll() afterwards.
+    responseSender.foreach(_.detach())
+    responseSender = Some(newSender)
+    notifyAll()
+  }
+
+  /** Remove cached responses until index */
+  def removeUntilIndex(index: Long): Unit = synchronized {
+    while (responses.nonEmpty && responses(0).index <= index) {
+      responses.remove(0)
+    }
+    logDebug(s"Removed saved responses until index $index.")
+  }
+
+  /** Get response with a given index in the stream, if set. */
+  def getResponse(index: Long): Option[CachedExecutePlanResponse] = synchronized {
+    if (responses.nonEmpty && (index - responses(0).index).toInt < responses.size) {
+      // Note: index access in ListBuffer is linear; we assume here the buffer is not too long.
+      val ret = responses((index - responses(0).index).toInt)

Review Comment:
   `(index - responses(0).index).toInt` will be negative, so `< responses.size`  in the `if` above, so `None` will be returned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1260974008


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseSender.scala:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * ExecutePlanResponseSender sends responses to the GRPC stream.
+ * It runs on the RPC thread, and gets notified by ExecutePlanResponseObserver about available
+ * responses.
+ * It notifies the ExecutePlanResponseObserver back about cached responses that can be removed
+ * after being sent out.
+ * @param responseObserver the GRPC request StreamObserver
+ */
+private[connect] class ExecutePlanResponseSender(
+  grpcObserver: StreamObserver[ExecutePlanResponse]) extends Logging {
+
+  private var detached = false
+
+  /** Detach this sender from executionObserver.
+   *  Called only from executionObserver that this sender is attached to.
+   *  executionObserver holds lock, and needs to notify after this call. */
+  def detach(): Unit = {
+    if (detached == true) {
+      throw new IllegalStateException("ExecutePlanResponseSender already detached!")
+    }
+    detached = true
+  }
+
+  /**
+   * Receive responses from executionObserver and send them to grpcObserver.
+   * @param lastSentIndex Start sending the stream from response after this.
+   * @return true if the execution was detached before stream completed.
+   *         The caller needs to finish the grpcObserver stream
+   *         false if stream was finished. In this case, grpcObserver stream is already completed.
+   */
+  def run(executionObserver: ExecutePlanResponseObserver, lastSentIndex: Long): Boolean = {

Review Comment:
   Yes. the name of the class ResponseSender is supposed to mean that it pushes messages to the grpc observer.
   In this instance it pushes all message. With reattaching, it will get added options to finish sooner (time based, size based, number of responses based...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1260958887


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None
+
+  def onNext(r: ExecutePlanResponse): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onNext can't be called after stream completed")
+    }
+    index += 1
+    responses += CachedExecutePlanResponse(r, index)
+    logDebug(s"Saved response with index=$index")
+    notifyAll()
+  }
+
+  def onError(t: Throwable): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onError can't be called after stream completed")
+    }
+    error = Some(t)
+    lastIndex = Some(index) // no responses to be send after error.
+    logDebug(s"Error. Last stream index is $index.")
+    notifyAll()
+  }
+
+  def onCompleted(): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onCompleted can't be called after stream completed")
+    }
+    lastIndex = Some(index)
+    logDebug(s"Completed. Last stream index is $index.")
+    notifyAll()
+  }
+
+  /** Set a new response sender. */
+  def setExecutePlanResponseSender(newSender: ExecutePlanResponseSender): Unit = synchronized {
+    // detach the current sender before attaching new one
+    // this.synchronized() needs to be held while detaching a sender, and the detached sender
+    // needs to be notified with notifyAll() afterwards.
+    responseSender.foreach(_.detach())
+    responseSender = Some(newSender)
+    notifyAll()
+  }
+
+  /** Remove cached responses until index */
+  def removeUntilIndex(index: Long): Unit = synchronized {

Review Comment:
   My intention was for the consumer to decide that, but it's open to tweaking whether it should be the producer deciding it... it may indeed be easier and cleaner to let the producer do this, though then the producer must know of the type of the consumer - e.g. for a non-reattachable consumer it can clear responses immediately after giving them, while for a reattachable consumer it needs to keep some back buffer in case some response got lost and reattach needs to go back.
   But again, indeed seems that that policy may also be cleaner to be on producer side than on the consumer side...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1260961841


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None
+
+  def onNext(r: ExecutePlanResponse): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onNext can't be called after stream completed")
+    }
+    index += 1
+    responses += CachedExecutePlanResponse(r, index)
+    logDebug(s"Saved response with index=$index")
+    notifyAll()
+  }
+
+  def onError(t: Throwable): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onError can't be called after stream completed")
+    }
+    error = Some(t)
+    lastIndex = Some(index) // no responses to be send after error.
+    logDebug(s"Error. Last stream index is $index.")
+    notifyAll()
+  }
+
+  def onCompleted(): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onCompleted can't be called after stream completed")
+    }
+    lastIndex = Some(index)
+    logDebug(s"Completed. Last stream index is $index.")
+    notifyAll()
+  }
+
+  /** Set a new response sender. */
+  def setExecutePlanResponseSender(newSender: ExecutePlanResponseSender): Unit = synchronized {
+    // detach the current sender before attaching new one
+    // this.synchronized() needs to be held while detaching a sender, and the detached sender
+    // needs to be notified with notifyAll() afterwards.
+    responseSender.foreach(_.detach())
+    responseSender = Some(newSender)
+    notifyAll()
+  }
+
+  /** Remove cached responses until index */
+  def removeUntilIndex(index: Long): Unit = synchronized {
+    while (responses.nonEmpty && responses(0).index <= index) {
+      responses.remove(0)
+    }
+    logDebug(s"Removed saved responses until index $index.")
+  }
+
+  /** Get response with a given index in the stream, if set. */
+  def getResponse(index: Long): Option[CachedExecutePlanResponse] = synchronized {
+    if (responses.nonEmpty && (index - responses(0).index).toInt < responses.size) {

Review Comment:
   Again, conflating index in `responses` with `CachedExecutePlanResponse.index`... will try to make it cleaner and clearer.
   Maybe just a `Map` instead of `ListBuffer` and not trying to do tricks like `responses(index - responses(0).index)`  would make it look cleaner, and I don't think a map will have bigger complexity...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1262927094


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.connect.service.ExecuteHolder
+import org.apache.spark.sql.connect.utils.ErrorUtils
+import org.apache.spark.util.Utils
+
+/**
+ * This class launches the actual execution in an execution thread. The execution pushes the
+ * responses to a ExecuteResponseObserver in executeHolder.
+ */
+private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends Logging {
+
+  // The newly created thread will inherit all InheritableThreadLocals used by Spark,
+  // e.g. SparkContext.localProperties. If considering implementing a threadpool,
+  // forwarding of thread locals needs to be taken into account.
+  private var executionThread: Thread = new ExecutionThread()
+
+  private var interrupted: Boolean = false
+
+  /** Launches the execution in a background thread, returns immediately. */
+  def start(): Unit = {
+    executionThread.start()
+  }
+
+  /** Joins the background execution thread after it is finished. */
+  def join(): Unit = {
+    executionThread.join()
+  }
+
+  /** Interrupt the executing thread. */
+  def interrupt(): Unit = {
+    synchronized {
+      interrupted = true
+      executionThread.interrupt()
+    }
+  }
+
+  private def execute(): Unit = {
+    // Outer execute handles errors.
+    // Separate it from executeInternal to save on indent and improve readability.
+    try {
+      try {
+        executeInternal()
+      } catch {
+        // Need to catch throwable instead of NonFatal, because e.g. InterruptedException is fatal.
+        case e: Throwable =>
+          logDebug(s"Exception in execute: $e")
+          // Always cancel all remaining execution after error.
+          executeHolder.sessionHolder.session.sparkContext.cancelJobsWithTag(executeHolder.jobTag)

Review Comment:
   Does this wait for cancellation to succeed? What happens if this throws? 
   I see currently only way is to do `session.interruptAll()`.
   Wondering if we plan to provide per-query API to cancel and if that would have any contract regarding cancellation. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1213566557


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
+
+  val jobGroupId =
+    s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
+
+  val session = sessionHolder.session
+
+  var executePlanRequest: Option[proto.ExecutePlanRequest] = None
+
+  var executePlanResponseObserver: Option[ExecutePlanResponseObserver] = None
+
+  private var executionThread: Thread = null
+
+  private var executionError: Option[Throwable] = None
+
+  private var interrupted: Boolean = false
+
+  def run(
+      request: proto.ExecutePlanRequest,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    // Set the state of what needs to be run.
+    this.executePlanRequest = Some(request)
+    this.executePlanResponseObserver = Some(new ExecutePlanResponseObserver(responseObserver))
+    // And start the execution.
+    startExecute()
+  }
+
+  protected def startExecute(): Unit = {
+    // synchronized in case of interrupt while starting.
+    synchronized {
+      // The newly created thread will inherit all InheritableThreadLocals used by Spark,
+      // e.g. SparkContext.localProperties./ If considering implementing a threadpool,
+      // forwarding of thread locals needs to be taken into account.
+      this.executionThread = new Thread() {
+        override def run(): Unit = {
+          execute()
+        }
+      }
+    }
+
+    try {
+      // Start execution thread..
+      this.executionThread.start()
+      // ... and wait for execution thread to finish.
+      // TODO: Detach execution from RPC request. Then this can return early, and results
+      // are served to the client via additional RPCs from ExecutePlanResponseObserver.
+      this.executionThread.join()
+
+      executionError.foreach { error =>
+        logDebug(s"executionError: ${error}")
+        throw error
+      }
+    } catch {
+      case NonFatal(e) =>
+        // In case of exception happening on the handler thread, interrupt the underlying execution.
+        this.interrupt()
+        throw e
+    }
+  }
+
+  protected def execute() = SparkConnectArtifactManager.withArtifactClassLoader {
+    try {
+      // synchronized - check if already got interrupted while starting.
+      synchronized {
+        if (interrupted) {
+          throw new InterruptedException()
+        }
+      }
+
+      session.withActive {
+        val debugString = requestString(executePlanRequest.get)
+
+        session.sparkContext.setJobGroup(
+          jobGroupId,
+          s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}",
+          interruptOnCancel = true)
+
+        // Add debug information to the query execution so that the jobs are traceable.
+        session.sparkContext.setLocalProperty(
+          "callSite.short",
+          s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}")
+        session.sparkContext.setLocalProperty(
+          "callSite.long",
+          StringUtils.abbreviate(debugString, 2048))
+
+        executePlanRequest.foreach { request =>
+          request.getPlan.getOpTypeCase match {
+            case proto.Plan.OpTypeCase.COMMAND => handleCommand(request)
+            case proto.Plan.OpTypeCase.ROOT => handlePlan(request)
+            case _ =>
+              throw new UnsupportedOperationException(
+                s"${request.getPlan.getOpTypeCase} not supported.")
+          }
+        }
+      }
+    } catch {
+      // Actually do need to catch Throwable as some failures don't inherit from Exception and
+      // HiveServer will silently swallow them.
+      case e: Throwable =>

Review Comment:
   Please rethrow this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1213484264


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {

Review Comment:
   Please make it a normal class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1206882959


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
+
+  val jobGroupId =
+    s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
+
+  val session = sessionHolder.session
+
+  var executePlanRequest: Option[proto.ExecutePlanRequest] = None
+
+  var executePlanResponseObserver: Option[ExecutePlanResponseObserver] = None
+
+  private var executionThread: Thread = null
+
+  private var executionError: Option[Throwable] = None
+
+  private var interrupted: Boolean = false
+
+  def run(
+      request: proto.ExecutePlanRequest,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    // Set the state of what needs to be run.
+    this.executePlanRequest = Some(request)
+    this.executePlanResponseObserver = Some(new ExecutePlanResponseObserver(responseObserver))
+    // And start the execution.
+    startExecute()
+  }
+
+  protected def startExecute(): Unit = {
+    // synchronized in case of interrupt while starting.
+    synchronized {
+      // The newly created thread will inherit all InheritableThreadLocals used by Spark,
+      // e.g. SparkContext.localProperties./ If considering implementing a threadpool,
+      // forwarding of thread locals needs to be taken into account.
+      this.executionThread = new Thread() {
+        override def run(): Unit = {
+          execute()
+        }
+      }

Review Comment:
   Is there a particular reason for the nesting? Why not create an ExecutionThread that inherits from Thread instead of creating a thread that keeps a reference to the outside that is called from within?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
+
+  val jobGroupId =
+    s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
+
+  val session = sessionHolder.session
+
+  var executePlanRequest: Option[proto.ExecutePlanRequest] = None
+
+  var executePlanResponseObserver: Option[ExecutePlanResponseObserver] = None
+
+  private var executionThread: Thread = null
+
+  private var executionError: Option[Throwable] = None
+
+  private var interrupted: Boolean = false
+
+  def run(
+      request: proto.ExecutePlanRequest,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    // Set the state of what needs to be run.
+    this.executePlanRequest = Some(request)
+    this.executePlanResponseObserver = Some(new ExecutePlanResponseObserver(responseObserver))
+    // And start the execution.
+    startExecute()
+  }
+
+  protected def startExecute(): Unit = {
+    // synchronized in case of interrupt while starting.
+    synchronized {
+      // The newly created thread will inherit all InheritableThreadLocals used by Spark,
+      // e.g. SparkContext.localProperties./ If considering implementing a threadpool,
+      // forwarding of thread locals needs to be taken into account.
+      this.executionThread = new Thread() {
+        override def run(): Unit = {
+          execute()
+        }
+      }
+    }
+
+    try {
+      // Start execution thread..
+      this.executionThread.start()
+      // ... and wait for execution thread to finish.
+      // TODO: Detach execution from RPC request. Then this can return early, and results
+      // are served to the client via additional RPCs from ExecutePlanResponseObserver.
+      this.executionThread.join()
+
+      executionError.foreach { error =>
+        logDebug(s"executionError: ${error}")
+        throw error
+      }
+    } catch {
+      case NonFatal(e) =>
+        // In case of exception happening on the handler thread, interrupt the underlying execution.
+        this.interrupt()

Review Comment:
   does this assume that the execution thread was interrupted?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
+
+  val jobGroupId =
+    s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
+
+  val session = sessionHolder.session
+
+  var executePlanRequest: Option[proto.ExecutePlanRequest] = None
+
+  var executePlanResponseObserver: Option[ExecutePlanResponseObserver] = None
+
+  private var executionThread: Thread = null
+
+  private var executionError: Option[Throwable] = None
+
+  private var interrupted: Boolean = false
+
+  def run(
+      request: proto.ExecutePlanRequest,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    // Set the state of what needs to be run.
+    this.executePlanRequest = Some(request)
+    this.executePlanResponseObserver = Some(new ExecutePlanResponseObserver(responseObserver))
+    // And start the execution.
+    startExecute()
+  }
+
+  protected def startExecute(): Unit = {
+    // synchronized in case of interrupt while starting.
+    synchronized {
+      // The newly created thread will inherit all InheritableThreadLocals used by Spark,
+      // e.g. SparkContext.localProperties./ If considering implementing a threadpool,
+      // forwarding of thread locals needs to be taken into account.
+      this.executionThread = new Thread() {
+        override def run(): Unit = {
+          execute()
+        }
+      }
+    }
+
+    try {
+      // Start execution thread..
+      this.executionThread.start()
+      // ... and wait for execution thread to finish.
+      // TODO: Detach execution from RPC request. Then this can return early, and results
+      // are served to the client via additional RPCs from ExecutePlanResponseObserver.
+      this.executionThread.join()
+
+      executionError.foreach { error =>
+        logDebug(s"executionError: ${error}")
+        throw error
+      }
+    } catch {
+      case NonFatal(e) =>
+        // In case of exception happening on the handler thread, interrupt the underlying execution.
+        this.interrupt()
+        throw e
+    }
+  }
+
+  protected def execute() = SparkConnectArtifactManager.withArtifactClassLoader {
+    try {
+      // synchronized - check if already got interrupted while starting.
+      synchronized {
+        if (interrupted) {
+          throw new InterruptedException()
+        }
+      }
+
+      session.withActive {
+        val debugString = requestString(executePlanRequest.get)
+
+        session.sparkContext.setJobGroup(
+          jobGroupId,
+          s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}",
+          interruptOnCancel = true)
+
+        // Add debug information to the query execution so that the jobs are traceable.
+        session.sparkContext.setLocalProperty(
+          "callSite.short",
+          s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}")
+        session.sparkContext.setLocalProperty(
+          "callSite.long",
+          StringUtils.abbreviate(debugString, 2048))
+
+        executePlanRequest.foreach { request =>
+          request.getPlan.getOpTypeCase match {
+            case proto.Plan.OpTypeCase.COMMAND => handleCommand(request)
+            case proto.Plan.OpTypeCase.ROOT => handlePlan(request)
+            case _ =>
+              throw new UnsupportedOperationException(
+                s"${request.getPlan.getOpTypeCase} not supported.")
+          }
+        }
+      }
+    } catch {
+      // Actually do need to catch Throwable as some failures don't inherit from Exception and
+      // HiveServer will silently swallow them.
+      case e: Throwable =>
+        // scalastyle:off
+        logDebug(s"Exception in execute: $e")
+        // Always cancel all remaining execution after error.
+        sessionHolder.session.sparkContext.cancelJobGroup(jobGroupId)
+        executionError = if (interrupted) {
+          // Turn the interrupt into OPERATION_CANCELLED error.
+          Some(new SparkSQLException("OPERATION_CANCELLED", Map.empty))
+        } else {
+          // Return the originally thrown error.
+          Some(e)
+        }
+    } finally {
+      session.sparkContext.clearJobGroup()
+    }
+  }
+
+  def interrupt(): Unit = {

Review Comment:
   ```suggestion
     def interrupt(): Unit = synchronized {
   ```
   might save you an indent.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
+
+  val jobGroupId =
+    s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
+
+  val session = sessionHolder.session
+
+  var executePlanRequest: Option[proto.ExecutePlanRequest] = None
+
+  var executePlanResponseObserver: Option[ExecutePlanResponseObserver] = None
+
+  private var executionThread: Thread = null
+
+  private var executionError: Option[Throwable] = None
+
+  private var interrupted: Boolean = false
+
+  def run(
+      request: proto.ExecutePlanRequest,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    // Set the state of what needs to be run.
+    this.executePlanRequest = Some(request)
+    this.executePlanResponseObserver = Some(new ExecutePlanResponseObserver(responseObserver))
+    // And start the execution.
+    startExecute()

Review Comment:
   you have too many execute like things here.
   
   You have 
   
   ```
   def run()
      startExecute()
         execute()
   ```
   
   I'm wondering if this can be named better. 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.ByteString
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
+import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.connect.service.ExecutionHolder
+import org.apache.spark.sql.execution.{LocalTableScanExec, SQLExecution}
+import org.apache.spark.sql.execution.arrow.ArrowConverters
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Handle ExecutePlanRequest where the operatoin to handle is Plan execution of type
+ * proto.Plan.OpTypeCase.ROOT
+ * @param executionHolder
+ */
+class SparkConnectPlanExecution(executionHolder: ExecutionHolder) {

Review Comment:
   should this be package private?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
+
+  val jobGroupId =
+    s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
+
+  val session = sessionHolder.session
+
+  var executePlanRequest: Option[proto.ExecutePlanRequest] = None
+
+  var executePlanResponseObserver: Option[ExecutePlanResponseObserver] = None
+
+  private var executionThread: Thread = null
+
+  private var executionError: Option[Throwable] = None
+
+  private var interrupted: Boolean = false
+
+  def run(
+      request: proto.ExecutePlanRequest,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    // Set the state of what needs to be run.
+    this.executePlanRequest = Some(request)
+    this.executePlanResponseObserver = Some(new ExecutePlanResponseObserver(responseObserver))
+    // And start the execution.
+    startExecute()

Review Comment:
   nit: `runInternal`? Just from the naming start feals weird because you're already in run?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
+
+  val jobGroupId =
+    s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
+
+  val session = sessionHolder.session
+
+  var executePlanRequest: Option[proto.ExecutePlanRequest] = None
+
+  var executePlanResponseObserver: Option[ExecutePlanResponseObserver] = None
+
+  private var executionThread: Thread = null
+
+  private var executionError: Option[Throwable] = None
+
+  private var interrupted: Boolean = false
+
+  def run(
+      request: proto.ExecutePlanRequest,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    // Set the state of what needs to be run.
+    this.executePlanRequest = Some(request)
+    this.executePlanResponseObserver = Some(new ExecutePlanResponseObserver(responseObserver))
+    // And start the execution.
+    startExecute()
+  }
+
+  protected def startExecute(): Unit = {
+    // synchronized in case of interrupt while starting.
+    synchronized {
+      // The newly created thread will inherit all InheritableThreadLocals used by Spark,
+      // e.g. SparkContext.localProperties./ If considering implementing a threadpool,
+      // forwarding of thread locals needs to be taken into account.
+      this.executionThread = new Thread() {

Review Comment:
   Is there a reason for using `this.`?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.ByteString
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
+import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.connect.service.ExecutionHolder
+import org.apache.spark.sql.execution.{LocalTableScanExec, SQLExecution}
+import org.apache.spark.sql.execution.arrow.ArrowConverters
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Handle ExecutePlanRequest where the operatoin to handle is Plan execution of type

Review Comment:
   ```suggestion
    * Handle ExecutePlanRequest where the operation to handle is of `Plan` type
   ```



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -33,26 +32,26 @@ import org.apache.spark.sql.SparkSession
 case class SessionHolder(userId: String, sessionId: String, session: SparkSession)
     extends Logging {
 
-  val executePlanOperations: ConcurrentMap[String, ExecutePlanHolder] =
-    new ConcurrentHashMap[String, ExecutePlanHolder]()
+  val executions: ConcurrentMap[String, ExecutionHolder] =
+    new ConcurrentHashMap[String, ExecutionHolder]()
 
-  private[connect] def createExecutePlanHolder(
-      request: proto.ExecutePlanRequest): ExecutePlanHolder = {
+  private[connect] def createExecutionHolder(): ExecutionHolder = {
 
     val operationId = UUID.randomUUID().toString
-    val executePlanHolder = ExecutePlanHolder(operationId, this, request)
-    assert(executePlanOperations.putIfAbsent(operationId, executePlanHolder) == null)
+    val executePlanHolder = ExecutionHolder(operationId, this)
+    assert(executions.putIfAbsent(operationId, executePlanHolder) == null)

Review Comment:
   Aren't we removing all of the asserts from the actual code? Should we then just mark it a regular exception handling code? I'm a huge fan of invariant assertions though :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on PR #41315:
URL: https://github.com/apache/spark/pull/41315#issuecomment-1625557137

   Ignore for now - ExecutePlanResponseSender is unfinished, didn't go through nit comments yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1261446404


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutionHolder.scala:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import io.grpc.stub.StreamObserver
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to hold the Spark Connect execution state, and perform
+ */
+case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
+
+  val jobGroupId =
+    s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
+
+  val session = sessionHolder.session
+
+  var executePlanRequest: Option[proto.ExecutePlanRequest] = None
+
+  var executePlanResponseObserver: Option[ExecutePlanResponseObserver] = None
+
+  private var executionThread: Thread = null
+
+  private var executionError: Option[Throwable] = None
+
+  private var interrupted: Boolean = false
+
+  def run(
+      request: proto.ExecutePlanRequest,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    // Set the state of what needs to be run.
+    this.executePlanRequest = Some(request)
+    this.executePlanResponseObserver = Some(new ExecutePlanResponseObserver(responseObserver))
+    // And start the execution.
+    startExecute()
+  }
+
+  protected def startExecute(): Unit = {
+    // synchronized in case of interrupt while starting.
+    synchronized {
+      // The newly created thread will inherit all InheritableThreadLocals used by Spark,
+      // e.g. SparkContext.localProperties./ If considering implementing a threadpool,
+      // forwarding of thread locals needs to be taken into account.
+      this.executionThread = new Thread() {

Review Comment:
   Me not remembering if one has to use `this.` or doesn't have to use this in scala.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1258756120


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseSender.scala:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+
+/**
+ * ExecutePlanResponseSender sends responses to the GRPC stream.
+ * It runs on the RPC thread, and gets notified by ExecutePlanResponseObserver about available
+ * responses.
+ * It notifies the ExecutePlanResponseObserver back about cached responses that can be removed
+ * after being sent out.
+ * @param responseObserver the GRPC request StreamObserver
+ */
+private[connect] class ExecutePlanResponseSender(
+  executionObserver: ExecutePlanResponseObserver,
+  grpcObserver: StreamObserver[ExecutePlanResponse]) {
+
+  executionObserver.setExecutePlanResponseSender(this)
+
+  private val signal = new Object
+  private var detached = false
+
+  def notifyResponse(): Unit = {
+    signal.synchronized {
+      signal.notify()
+    }
+  }
+
+  def detach(): Unit = {
+    signal.synchronized {
+      this.detached = true
+      signal.notify()
+    }
+  }
+
+  /**
+   * Receive responses from executionObserver and send them to grpcObserver.
+   * @param lastSentIndex Start sending the stream from response after this.
+   * @return true if finished because stream completed
+   *         false if finished because stream was detached
+   */
+  def run(lastSentIndex: Long): Boolean = {
+    // register to be notified about available responses.
+    executionObserver.setExecutePlanResponseSender(this)
+
+    var currentIndex = lastSentIndex + 1
+    var finished = false
+
+    while (!finished) {
+      var response: Option[CachedExecutePlanResponse] = None
+      // Get next available response.
+      // Wait until either this sender got detached or next response is ready,
+      // or the stream is complete and it had already sent all responses.
+      signal.synchronized {
+        while (!detached && response.isEmpty &&
+            executionObserver.getLastIndex().forall(currentIndex <= _)) {
+          response = executionObserver.getResponse(currentIndex)
+          // If response is empty, wait to get notified.
+          // We are cholding signal here, so:
+          // - if detach() is waiting on signal, and will acquire it after wait()
+          //   here releases it, and wait() will be waked up by notify.
+          // - if getLastIndex() or getResponse() changed, executionObserver would call
+          //   notify(), which would wait on signal, and will acquire it after wait() here releases
+          //   it, and wait will be waked up by notify.
+          if (response.isEmpty) {
+            signal.wait()
+          }
+        }
+      }
+
+      // Send next available response.
+      if (detached) {
+        // This sender got detached by the observer.
+        finished = true
+      } else if (response.isDefined) {
+        // There is a response available to be sent.
+        grpcObserver.onNext(response.get.r)
+        // Remove after sending.
+        executionObserver.removeUntilIndex(currentIndex)
+        currentIndex += 1
+      } else if (executionObserver.getLastIndex().forall(currentIndex > _)) {

Review Comment:
   ```suggestion
         } else if (executionObserver.getLastIndex().forall(currentIndex >= _)) {
   ```
   
   ?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver() extends StreamObserver[ExecutePlanResponse] {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var completed: Boolean = false
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None
+
+  def onNext(r: ExecutePlanResponse): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onNext can't be called after stream completed")
+    }
+    index += 1
+    responses += CachedExecutePlanResponse(r, index)
+    notifySender()
+  }
+
+  def onError(t: Throwable): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onError can't be called after stream completed")
+    }
+    error = Some(t)

Review Comment:
   And we were just talking about this. It's great that we cache this here for an explicit error response to pikcup.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseSender.scala:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+
+/**
+ * ExecutePlanResponseSender sends responses to the GRPC stream.
+ * It runs on the RPC thread, and gets notified by ExecutePlanResponseObserver about available
+ * responses.
+ * It notifies the ExecutePlanResponseObserver back about cached responses that can be removed
+ * after being sent out.
+ * @param responseObserver the GRPC request StreamObserver
+ */
+private[connect] class ExecutePlanResponseSender(
+  executionObserver: ExecutePlanResponseObserver,
+  grpcObserver: StreamObserver[ExecutePlanResponse]) {
+
+  executionObserver.setExecutePlanResponseSender(this)
+
+  private val signal = new Object
+  private var detached = false
+
+  def notifyResponse(): Unit = {
+    signal.synchronized {
+      signal.notify()
+    }
+  }
+
+  def detach(): Unit = {
+    signal.synchronized {
+      this.detached = true
+      signal.notify()
+    }
+  }
+
+  /**
+   * Receive responses from executionObserver and send them to grpcObserver.
+   * @param lastSentIndex Start sending the stream from response after this.
+   * @return true if finished because stream completed
+   *         false if finished because stream was detached
+   */
+  def run(lastSentIndex: Long): Boolean = {
+    // register to be notified about available responses.
+    executionObserver.setExecutePlanResponseSender(this)
+
+    var currentIndex = lastSentIndex + 1
+    var finished = false
+
+    while (!finished) {
+      var response: Option[CachedExecutePlanResponse] = None
+      // Get next available response.
+      // Wait until either this sender got detached or next response is ready,
+      // or the stream is complete and it had already sent all responses.
+      signal.synchronized {
+        while (!detached && response.isEmpty &&
+            executionObserver.getLastIndex().forall(currentIndex <= _)) {
+          response = executionObserver.getResponse(currentIndex)
+          // If response is empty, wait to get notified.
+          // We are cholding signal here, so:
+          // - if detach() is waiting on signal, and will acquire it after wait()
+          //   here releases it, and wait() will be waked up by notify.
+          // - if getLastIndex() or getResponse() changed, executionObserver would call
+          //   notify(), which would wait on signal, and will acquire it after wait() here releases
+          //   it, and wait will be waked up by notify.
+          if (response.isEmpty) {
+            signal.wait()
+          }
+        }
+      }
+
+      // Send next available response.
+      if (detached) {
+        // This sender got detached by the observer.
+        finished = true
+      } else if (response.isDefined) {
+        // There is a response available to be sent.
+        grpcObserver.onNext(response.get.r)
+        // Remove after sending.
+        executionObserver.removeUntilIndex(currentIndex)
+        currentIndex += 1
+      } else if (executionObserver.getLastIndex().forall(currentIndex > _)) {

Review Comment:
   At least if I'm not wrong if you have a one batch response first `currentIndex=0` which is incremented after fetching it, then its `currentIndex=1` but now, the lastindex will be 1 as well and currentIndex is not > then lastIndex



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1260927450


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver() extends StreamObserver[ExecutePlanResponse] {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var completed: Boolean = false
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None
+
+  def onNext(r: ExecutePlanResponse): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onNext can't be called after stream completed")
+    }
+    index += 1
+    responses += CachedExecutePlanResponse(r, index)
+    notifySender()
+  }
+
+  def onError(t: Throwable): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onError can't be called after stream completed")
+    }
+    error = Some(t)

Review Comment:
   The gRPC errors are not just for internal system errors. The state of the stream is undefined after an error has been thrown and I think its fair to close it with onError. Using the response trailers for exceptions that contain an 8MB query plan and description is not great. 
   
   My current thinking is to leverage the reattach RPC to fetch the last error and get a hifi response message with all of the metadata of the error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1260099890


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None

Review Comment:
   Do you want to document that the specific instance of the `ExecutePlanResponseSender` might change because it is the connection to the actual gprc response handler?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.

Review Comment:
   ```suggestion
     // Sender to notify of available responses.
   ```



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1

Review Comment:
   for consistency instead of being called index, should this then be `lastIndex`? Since this makes clearer what the intend is? Otherwise, readers might believe that this is a zero based offset into the `responses`?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None
+
+  def onNext(r: ExecutePlanResponse): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onNext can't be called after stream completed")
+    }
+    index += 1
+    responses += CachedExecutePlanResponse(r, index)
+    logDebug(s"Saved response with index=$index")
+    notifyAll()
+  }
+
+  def onError(t: Throwable): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onError can't be called after stream completed")
+    }
+    error = Some(t)
+    lastIndex = Some(index) // no responses to be send after error.
+    logDebug(s"Error. Last stream index is $index.")
+    notifyAll()
+  }
+
+  def onCompleted(): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onCompleted can't be called after stream completed")
+    }
+    lastIndex = Some(index)
+    logDebug(s"Completed. Last stream index is $index.")
+    notifyAll()
+  }
+
+  /** Set a new response sender. */
+  def setExecutePlanResponseSender(newSender: ExecutePlanResponseSender): Unit = synchronized {
+    // detach the current sender before attaching new one
+    // this.synchronized() needs to be held while detaching a sender, and the detached sender
+    // needs to be notified with notifyAll() afterwards.
+    responseSender.foreach(_.detach())
+    responseSender = Some(newSender)
+    notifyAll()
+  }
+
+  /** Remove cached responses until index */
+  def removeUntilIndex(index: Long): Unit = synchronized {

Review Comment:
   should this be public to the consumer? Do you want to make it a two step process for removing and getting responses?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None
+
+  def onNext(r: ExecutePlanResponse): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onNext can't be called after stream completed")
+    }
+    index += 1
+    responses += CachedExecutePlanResponse(r, index)
+    logDebug(s"Saved response with index=$index")
+    notifyAll()
+  }
+
+  def onError(t: Throwable): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onError can't be called after stream completed")
+    }
+    error = Some(t)
+    lastIndex = Some(index) // no responses to be send after error.
+    logDebug(s"Error. Last stream index is $index.")
+    notifyAll()
+  }
+
+  def onCompleted(): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onCompleted can't be called after stream completed")
+    }
+    lastIndex = Some(index)
+    logDebug(s"Completed. Last stream index is $index.")
+    notifyAll()
+  }
+
+  /** Set a new response sender. */
+  def setExecutePlanResponseSender(newSender: ExecutePlanResponseSender): Unit = synchronized {
+    // detach the current sender before attaching new one
+    // this.synchronized() needs to be held while detaching a sender, and the detached sender
+    // needs to be notified with notifyAll() afterwards.
+    responseSender.foreach(_.detach())
+    responseSender = Some(newSender)
+    notifyAll()
+  }
+
+  /** Remove cached responses until index */
+  def removeUntilIndex(index: Long): Unit = synchronized {
+    while (responses.nonEmpty && responses(0).index <= index) {
+      responses.remove(0)
+    }
+    logDebug(s"Removed saved responses until index $index.")
+  }
+
+  /** Get response with a given index in the stream, if set. */
+  def getResponse(index: Long): Option[CachedExecutePlanResponse] = synchronized {
+    if (responses.nonEmpty && (index - responses(0).index).toInt < responses.size) {

Review Comment:
   This is a bit weird, you use the responses size to allow concurrent fetching while the query is not done, but keep track of the overall lastIndex as well.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None
+
+  def onNext(r: ExecutePlanResponse): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onNext can't be called after stream completed")
+    }
+    index += 1
+    responses += CachedExecutePlanResponse(r, index)
+    logDebug(s"Saved response with index=$index")
+    notifyAll()
+  }
+
+  def onError(t: Throwable): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onError can't be called after stream completed")
+    }
+    error = Some(t)
+    lastIndex = Some(index) // no responses to be send after error.
+    logDebug(s"Error. Last stream index is $index.")
+    notifyAll()
+  }
+
+  def onCompleted(): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onCompleted can't be called after stream completed")
+    }
+    lastIndex = Some(index)
+    logDebug(s"Completed. Last stream index is $index.")
+    notifyAll()
+  }
+
+  /** Set a new response sender. */
+  def setExecutePlanResponseSender(newSender: ExecutePlanResponseSender): Unit = synchronized {
+    // detach the current sender before attaching new one
+    // this.synchronized() needs to be held while detaching a sender, and the detached sender
+    // needs to be notified with notifyAll() afterwards.
+    responseSender.foreach(_.detach())
+    responseSender = Some(newSender)
+    notifyAll()
+  }
+
+  /** Remove cached responses until index */
+  def removeUntilIndex(index: Long): Unit = synchronized {
+    while (responses.nonEmpty && responses(0).index <= index) {
+      responses.remove(0)
+    }
+    logDebug(s"Removed saved responses until index $index.")
+  }
+
+  /** Get response with a given index in the stream, if set. */
+  def getResponse(index: Long): Option[CachedExecutePlanResponse] = synchronized {
+    if (responses.nonEmpty && (index - responses(0).index).toInt < responses.size) {
+      // Note: index access in ListBuffer is linear; we assume here the buffer is not too long.
+      val ret = responses((index - responses(0).index).toInt)
+      assert(ret.index == index)
+      Some(ret)
+    } else {
+      None
+    }
+  }
+
+  /** Get the stream error, if set.  */
+  def getError(): Option[Throwable] = synchronized {
+    error
+  }
+
+  /** If the stream is finished, the index of the last response, otherwise unset. */
+  def getLastIndex(): Option[Long] = synchronized {
+    lastIndex
+  }
+
+  def completed(): Boolean = synchronized {
+    lastIndex.isDefined

Review Comment:
   why not make lastIndex a flag `completed` instead



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None
+
+  def onNext(r: ExecutePlanResponse): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onNext can't be called after stream completed")
+    }
+    index += 1
+    responses += CachedExecutePlanResponse(r, index)
+    logDebug(s"Saved response with index=$index")
+    notifyAll()
+  }
+
+  def onError(t: Throwable): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onError can't be called after stream completed")
+    }
+    error = Some(t)
+    lastIndex = Some(index) // no responses to be send after error.
+    logDebug(s"Error. Last stream index is $index.")
+    notifyAll()
+  }
+
+  def onCompleted(): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onCompleted can't be called after stream completed")
+    }
+    lastIndex = Some(index)
+    logDebug(s"Completed. Last stream index is $index.")
+    notifyAll()
+  }
+
+  /** Set a new response sender. */
+  def setExecutePlanResponseSender(newSender: ExecutePlanResponseSender): Unit = synchronized {
+    // detach the current sender before attaching new one
+    // this.synchronized() needs to be held while detaching a sender, and the detached sender
+    // needs to be notified with notifyAll() afterwards.
+    responseSender.foreach(_.detach())
+    responseSender = Some(newSender)
+    notifyAll()
+  }
+
+  /** Remove cached responses until index */
+  def removeUntilIndex(index: Long): Unit = synchronized {
+    while (responses.nonEmpty && responses(0).index <= index) {
+      responses.remove(0)
+    }
+    logDebug(s"Removed saved responses until index $index.")
+  }
+
+  /** Get response with a given index in the stream, if set. */
+  def getResponse(index: Long): Option[CachedExecutePlanResponse] = synchronized {
+    if (responses.nonEmpty && (index - responses(0).index).toInt < responses.size) {
+      // Note: index access in ListBuffer is linear; we assume here the buffer is not too long.
+      val ret = responses((index - responses(0).index).toInt)

Review Comment:
   passing in a negative index will probably crash and burn



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteRunner.scala:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.Message
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.SparkSQLException
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.connect.service.ExecuteHolder
+import org.apache.spark.sql.connect.utils.ErrorUtils
+import org.apache.spark.util.Utils
+
+private[connect] class ExecuteRunner(executeHolder: ExecuteHolder) extends Logging {
+
+  private var executionThread: Thread = new ExecutionThread()
+
+  private var interrupted: Boolean = false
+
+  def start(): Unit = {
+    // synchronized in case of interrupt while starting.
+    synchronized {
+      // The newly created thread will inherit all InheritableThreadLocals used by Spark,
+      // e.g. SparkContext.localProperties./ If considering implementing a threadpool,
+      // forwarding of thread locals needs to be taken into account.
+      this.executionThread.start()
+    }
+  }
+
+  private def execute(): Unit = {
+    // Outer execute
+    try {
+      try {
+        execute()

Review Comment:
   ```suggestion
           executeInternal()
   ```
   ??



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.

Review Comment:
   I think lastInedx should be renamed to something like `finalResponseSize` to indicate that it's used as as size value for the response instead of an actual index.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1

Review Comment:
   ```suggestion
     private var lastIndexSent: Long = 0 // first response will have index 1
   ```



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseSender.scala:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * ExecutePlanResponseSender sends responses to the GRPC stream.
+ * It runs on the RPC thread, and gets notified by ExecutePlanResponseObserver about available
+ * responses.
+ * It notifies the ExecutePlanResponseObserver back about cached responses that can be removed
+ * after being sent out.
+ * @param responseObserver the GRPC request StreamObserver
+ */
+private[connect] class ExecutePlanResponseSender(
+  grpcObserver: StreamObserver[ExecutePlanResponse]) extends Logging {
+
+  private var detached = false
+
+  /** Detach this sender from executionObserver.
+   *  Called only from executionObserver that this sender is attached to.
+   *  executionObserver holds lock, and needs to notify after this call. */
+  def detach(): Unit = {
+    if (detached == true) {
+      throw new IllegalStateException("ExecutePlanResponseSender already detached!")
+    }
+    detached = true
+  }
+
+  /**
+   * Receive responses from executionObserver and send them to grpcObserver.
+   * @param lastSentIndex Start sending the stream from response after this.
+   * @return true if the execution was detached before stream completed.
+   *         The caller needs to finish the grpcObserver stream
+   *         false if stream was finished. In this case, grpcObserver stream is already completed.
+   */
+  def run(executionObserver: ExecutePlanResponseObserver, lastSentIndex: Long): Boolean = {

Review Comment:
   Do I read this method correctly that this will try to push all messages to the grpc observer?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None
+
+  def onNext(r: ExecutePlanResponse): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onNext can't be called after stream completed")
+    }
+    index += 1
+    responses += CachedExecutePlanResponse(r, index)
+    logDebug(s"Saved response with index=$index")
+    notifyAll()
+  }
+
+  def onError(t: Throwable): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onError can't be called after stream completed")
+    }
+    error = Some(t)
+    lastIndex = Some(index) // no responses to be send after error.
+    logDebug(s"Error. Last stream index is $index.")
+    notifyAll()
+  }
+
+  def onCompleted(): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onCompleted can't be called after stream completed")
+    }
+    lastIndex = Some(index)
+    logDebug(s"Completed. Last stream index is $index.")
+    notifyAll()
+  }
+
+  /** Set a new response sender. */
+  def setExecutePlanResponseSender(newSender: ExecutePlanResponseSender): Unit = synchronized {
+    // detach the current sender before attaching new one
+    // this.synchronized() needs to be held while detaching a sender, and the detached sender
+    // needs to be notified with notifyAll() afterwards.
+    responseSender.foreach(_.detach())
+    responseSender = Some(newSender)
+    notifyAll()
+  }
+
+  /** Remove cached responses until index */
+  def removeUntilIndex(index: Long): Unit = synchronized {
+    while (responses.nonEmpty && responses(0).index <= index) {
+      responses.remove(0)
+    }
+    logDebug(s"Removed saved responses until index $index.")
+  }
+
+  /** Get response with a given index in the stream, if set. */
+  def getResponse(index: Long): Option[CachedExecutePlanResponse] = synchronized {
+    if (responses.nonEmpty && (index - responses(0).index).toInt < responses.size) {
+      // Note: index access in ListBuffer is linear; we assume here the buffer is not too long.
+      val ret = responses((index - responses(0).index).toInt)
+      assert(ret.index == index)
+      Some(ret)
+    } else {
+      None

Review Comment:
   should it throw or None?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseObserver.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.mutable.ListBuffer
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * Container for ExecutePlanResponses responses.
+ *
+ * This StreamObserver is running on the execution thread and saves the responses,
+ * it notifies the ExecutePlanResponseSender about available responses.
+ *
+ * @param responseObserver
+ */
+private[connect] class ExecutePlanResponseObserver()
+  extends StreamObserver[ExecutePlanResponse]
+  with Logging {
+
+  // Cached stream state.
+  private val responses = new ListBuffer[CachedExecutePlanResponse]()
+  private var error: Option[Throwable] = None
+  private var lastIndex: Option[Long] = None // index of last response before completed.
+  private var index: Long = 0 // first response will have index 1
+
+  // sender to notify of available responses.
+  private var responseSender: Option[ExecutePlanResponseSender] = None
+
+  def onNext(r: ExecutePlanResponse): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onNext can't be called after stream completed")
+    }
+    index += 1
+    responses += CachedExecutePlanResponse(r, index)
+    logDebug(s"Saved response with index=$index")
+    notifyAll()
+  }
+
+  def onError(t: Throwable): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onError can't be called after stream completed")
+    }
+    error = Some(t)
+    lastIndex = Some(index) // no responses to be send after error.
+    logDebug(s"Error. Last stream index is $index.")
+    notifyAll()
+  }
+
+  def onCompleted(): Unit = synchronized {
+    if (lastIndex.nonEmpty) {
+      throw new IllegalStateException("Stream onCompleted can't be called after stream completed")
+    }
+    lastIndex = Some(index)
+    logDebug(s"Completed. Last stream index is $index.")
+    notifyAll()
+  }
+
+  /** Set a new response sender. */
+  def setExecutePlanResponseSender(newSender: ExecutePlanResponseSender): Unit = synchronized {
+    // detach the current sender before attaching new one
+    // this.synchronized() needs to be held while detaching a sender, and the detached sender
+    // needs to be notified with notifyAll() afterwards.
+    responseSender.foreach(_.detach())
+    responseSender = Some(newSender)
+    notifyAll()
+  }
+
+  /** Remove cached responses until index */
+  def removeUntilIndex(index: Long): Unit = synchronized {
+    while (responses.nonEmpty && responses(0).index <= index) {
+      responses.remove(0)
+    }
+    logDebug(s"Removed saved responses until index $index.")
+  }
+
+  /** Get response with a given index in the stream, if set. */
+  def getResponse(index: Long): Option[CachedExecutePlanResponse] = synchronized {
+    if (responses.nonEmpty && (index - responses(0).index).toInt < responses.size) {
+      // Note: index access in ListBuffer is linear; we assume here the buffer is not too long.
+      val ret = responses((index - responses(0).index).toInt)
+      assert(ret.index == index)
+      Some(ret)
+    } else {
+      None
+    }
+  }
+
+  /** Get the stream error, if set.  */
+  def getError(): Option[Throwable] = synchronized {
+    error
+  }
+
+  /** If the stream is finished, the index of the last response, otherwise unset. */
+  def getLastIndex(): Option[Long] = synchronized {
+    lastIndex

Review Comment:
   ... and then you can return here `if completed return responses.size else None`



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecutePlanResponseSender.scala:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+
+/**
+ * ExecutePlanResponseSender sends responses to the GRPC stream.
+ * It runs on the RPC thread, and gets notified by ExecutePlanResponseObserver about available
+ * responses.
+ * It notifies the ExecutePlanResponseObserver back about cached responses that can be removed
+ * after being sent out.
+ * @param responseObserver the GRPC request StreamObserver
+ */
+private[connect] class ExecutePlanResponseSender(
+  grpcObserver: StreamObserver[ExecutePlanResponse]) extends Logging {
+
+  private var detached = false
+
+  /** Detach this sender from executionObserver.
+   *  Called only from executionObserver that this sender is attached to.
+   *  executionObserver holds lock, and needs to notify after this call. */
+  def detach(): Unit = {
+    if (detached == true) {
+      throw new IllegalStateException("ExecutePlanResponseSender already detached!")
+    }
+    detached = true
+  }
+
+  /**
+   * Receive responses from executionObserver and send them to grpcObserver.
+   * @param lastSentIndex Start sending the stream from response after this.

Review Comment:
   ```suggestion
      * @param lastSentIndex Start sending the stream from response after this.
                             The lastSentIndex indicates the last response seen by
                             the consumer. 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on PR #41315:
URL: https://github.com/apache/spark/pull/41315#issuecomment-1635661028

   CI before the lint change was clean except for lint:
   https://github.com/juliuszsompolski/apache-spark/actions/runs/5551730507


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on PR #41315:
URL: https://github.com/apache/spark/pull/41315#issuecomment-1635442834

   Resolved conflict.
   Previous CI run had 1 flaky test:
   ```
   2023-07-14T00:42:34.1310770Z [info] *** 1 TEST FAILED ***
   2023-07-14T00:42:34.1515215Z [error] Failed: Total 9369, Failed 1, Errors 0, Passed 9368, Ignored 27
   2023-07-14T00:42:34.1681702Z [error] Failed tests:
   2023-07-14T00:42:34.1682533Z [error] 	org.apache.spark.sql.execution.streaming.MicroBatchExecutionSuite
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1263499708


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -51,25 +51,24 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   // Mapping from id to StreamingQueryListener. Used for methods like removeListener() in
   // StreamingQueryManager.
   private lazy val listenerCache: ConcurrentMap[String, StreamingQueryListener] =
-    new ConcurrentHashMap()
-
-  private[connect] def createExecutePlanHolder(
-      request: proto.ExecutePlanRequest): ExecutePlanHolder = {
+  new ConcurrentHashMap()

Review Comment:
   ```suggestion
       new ConcurrentHashMap()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #41315: [SPARK-43755][CONNECT] Move execution out of SparkExecutePlanStreamHandler and to a different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #41315:
URL: https://github.com/apache/spark/pull/41315#discussion_r1213511276


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.execution
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.ByteString
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
+import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.connect.service.ExecutionHolder
+import org.apache.spark.sql.execution.{LocalTableScanExec, SQLExecution}
+import org.apache.spark.sql.execution.arrow.ArrowConverters
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Handle ExecutePlanRequest where the operatoin to handle is Plan execution of type

Review Comment:
   operatoin -> operation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org