You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhenlineo (via GitHub)" <gi...@apache.org> on 2023/05/02 16:19:06 UTC

[GitHub] [spark] zhenlineo commented on a diff in pull request #41005: [SPARK-43267][CONNECT] Add Spark Connect SparkSession.interruptAll

zhenlineo commented on code in PR #41005:
URL: https://github.com/apache/spark/pull/41005#discussion_r1182746021


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -184,6 +184,16 @@ private[sql] class SparkConnectClient(
     analyze(request)
   }
 
+  private[sql] def interruptAll(): proto.InterruptResponse = {
+    val builder = proto.InterruptRequest.newBuilder()

Review Comment:
   We use the same session to kill the on-going session. 
   Trying to understand GRPC: 
   Could it be possible that the client/session is blocked by something (e.g. streaming data, deadlock, GC etc.) that we cannot send the interrupt immediately to the other side? Will GRPC ensure us there will be a channel free to send this request?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.UUID
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+
+import collection.JavaConverters._
+
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Object used to hold the Spark Connect session state.
+ */
+case class SessionHolder(
+  userId: String,
+  sessionId: String,
+  session: SparkSession) {
+
+  val executePlanOperations: ConcurrentMap[String, ExecutePlanHolder] =
+    new ConcurrentHashMap[String, ExecutePlanHolder]()
+
+  private[connect] def createExecutePlanHolder(
+      request: proto.ExecutePlanRequest): ExecutePlanHolder = {
+
+    val operationId = UUID.randomUUID().toString
+    val executePlanHolder = ExecutePlanHolder(operationId, this, request)
+    assert(executePlanOperations.putIfAbsent(operationId, executePlanHolder) == null)
+    executePlanHolder
+  }
+
+  private[connect] def removeExecutePlanHolder(operationId: String): Unit = {
+    executePlanOperations.remove(operationId)
+  }
+
+  private[connect] def interruptAll(): Unit = {
+    executePlanOperations.asScala.values.foreach(_.interrupt())

Review Comment:
   It feel safer to have `_.interrupt()` wrapped in a throw and catch clause. 
   Can you add some of your nice explanation into the code comments too?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -47,34 +48,49 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
     extends Logging {
 
   def handle(v: ExecutePlanRequest): Unit = SparkConnectArtifactManager.withArtifactClassLoader {
-    val session =
-      SparkConnectService
-        .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
-        .session
+    val sessionHolder = SparkConnectService
+      .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
+    val session = sessionHolder.session
+
     session.withActive {
+      val debugString = try {
+        Utils.redact(
+          session.sessionState.conf.stringRedactionPattern,
+          ProtoUtils.abbreviate(v).toString)
+      } catch {
+        case NonFatal(e) =>
+          logWarning("Fail to extract debug information", e)
+          "UNKNOWN"
+      }
+
+      val executeHolder = sessionHolder.createExecutePlanHolder(v)
+      session.sparkContext.setJobGroup(
+        executeHolder.jobGroupId,
+        s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}",

Review Comment:
   I am a bit annoyed by this line in the console output:
   `Spark Connect - ${StringUtils.abbreviate(debugString, 128)`
   Is it enough to only have session id etc here? As this code produce a multi-line json output, it does not contain any useful info except than session id, but adds a lot of line of [ { etc.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -47,34 +48,49 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
     extends Logging {
 
   def handle(v: ExecutePlanRequest): Unit = SparkConnectArtifactManager.withArtifactClassLoader {
-    val session =
-      SparkConnectService
-        .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
-        .session
+    val sessionHolder = SparkConnectService
+      .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
+    val session = sessionHolder.session
+
     session.withActive {
+      val debugString = try {
+        Utils.redact(
+          session.sessionState.conf.stringRedactionPattern,
+          ProtoUtils.abbreviate(v).toString)
+      } catch {
+        case NonFatal(e) =>
+          logWarning("Fail to extract debug information", e)
+          "UNKNOWN"
+      }
+
+      val executeHolder = sessionHolder.createExecutePlanHolder(v)
+      session.sparkContext.setJobGroup(
+        executeHolder.jobGroupId,
+        s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}",
+        interruptOnCancel = true)
 
-      // Add debug information to the query execution so that the jobs are traceable.
       try {
-        val debugString =
-          Utils.redact(
-            session.sessionState.conf.stringRedactionPattern,
-            ProtoUtils.abbreviate(v).toString)
+        // Add debug information to the query execution so that the jobs are traceable.
         session.sparkContext.setLocalProperty(
           "callSite.short",
           s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}")

Review Comment:
   Same here.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -525,6 +525,19 @@ class SparkSession private[sql] (
     planIdGenerator.set(0)
   }
 
+  /**
+   * Interrupt all operations of this session currently running on the connected server.
+   *
+   * TODO/WIP: Currently it will interrupt the Spark Jobs running on the server, triggered from
+   * ExecutePlan requests. If an operation is not running a Spark Job, it becomes an noop and the

Review Comment:
   Will this be solved by adding a volatile boolean `isInterruped` in a followup PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org