You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "cdkrot (via GitHub)" <gi...@apache.org> on 2023/08/17 14:54:50 UTC

[GitHub] [spark] cdkrot opened a new pull request, #42538: [SPARK-44850][CONNECT] Heartbeat in scala's Spark Connect

cdkrot opened a new pull request, #42538:
URL: https://github.com/apache/spark/pull/42538

   ### What changes were proposed in this pull request?
   Implement a heartbeat for spark connect. This works by maintaining the number of open queries and sending "keep-alive" requests from a separate thread if at least one such query is running
   
   ### Why are the changes needed?
   
   We notice that clients who wish to execute long requests (specifically this happens to ExecutePlanRequests taking 1 hour or more), often face disconnects by intermediate proxy layers, such as those in common cloud providers. Apparently more standard ways to resolve this such as grpc's heartbeat don't help.
   
   ### Does this PR introduce _any_ user-facing change?
   This change proposes to enable heartbeat by default
   
   ### How was this patch tested?
   
   UT and E2E coverage. It was also separately verified that 1 hour 10 min query worked fine, i.e.
   
   ```
   test("Alice") {
       val n: Long = 80e13.toLong
   
       val res = spark.range(n).count()
   
       assert(res == n)
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42538: [SPARK-44850][CONNECT] Heartbeat in scala's Spark Connect

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42538:
URL: https://github.com/apache/spark/pull/42538#discussion_r1297373621


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala:
##########


Review Comment:
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42538: [SPARK-44850][CONNECT] Heartbeat in scala's Spark Connect

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42538:
URL: https://github.com/apache/spark/pull/42538#discussion_r1297372981


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/Heartbeat.scala:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+
+
+/**
+ * This class contains an interface for Heartbeat functionality and the dummy implementation which
+ * doesn't do anything.
+ *
+ * The actual implementation is in HeartbeatImpl
+ */
+private[sql] class Heartbeat {
+  def beat[T](fn: => T): T = fn
+
+  def beatIterator[T](fn: => CloseableIterator[T]): CloseableIterator[T] = fn
+
+  def heartbeatLevel: Int = 0
+
+  def pingCount: Long = 0
+}
+
+/**
+ * This class implements Heartbeat. Heartbeat consists of sending a dummy request once in a while
+ * if there is at least one long-term query running.
+ *
+ * Otherwise the connection may disconnect before client receives the response back.
+ */
+private[sql] class HeartbeatImpl(bstub: CustomSparkConnectBlockingStub)
+    extends Heartbeat
+    with Logging {
+  val beatLevel: AtomicInteger = new AtomicInteger(0)
+  var count: AtomicLong = new AtomicLong(0)
+
+  override def heartbeatLevel: Int = beatLevel.get()
+
+  override def pingCount: Long = count.get()
+
+  private val pingThread = new Thread() {

Review Comment:
   Sure, I will look into it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42538: [SPARK-44850][CONNECT] Heartbeat in scala's Spark Connect

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42538:
URL: https://github.com/apache/spark/pull/42538#discussion_r1297370004


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/Heartbeat.scala:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+
+
+/**
+ * This class contains an interface for Heartbeat functionality and the dummy implementation which
+ * doesn't do anything.
+ *
+ * The actual implementation is in HeartbeatImpl
+ */
+private[sql] class Heartbeat {
+  def beat[T](fn: => T): T = fn
+
+  def beatIterator[T](fn: => CloseableIterator[T]): CloseableIterator[T] = fn
+
+  def heartbeatLevel: Int = 0
+
+  def pingCount: Long = 0
+}
+
+/**
+ * This class implements Heartbeat. Heartbeat consists of sending a dummy request once in a while
+ * if there is at least one long-term query running.
+ *
+ * Otherwise the connection may disconnect before client receives the response back.
+ */
+private[sql] class HeartbeatImpl(bstub: CustomSparkConnectBlockingStub)
+    extends Heartbeat
+    with Logging {
+  val beatLevel: AtomicInteger = new AtomicInteger(0)
+  var count: AtomicLong = new AtomicLong(0)
+
+  override def heartbeatLevel: Int = beatLevel.get()
+
+  override def pingCount: Long = count.get()
+
+  private val pingThread = new Thread() {

Review Comment:
   Why not use an ExecutorService and schedule a runnable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44850][CONNECT] Heartbeat in scala's Spark Connect [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #42538: [SPARK-44850][CONNECT] Heartbeat in scala's Spark Connect
URL: https://github.com/apache/spark/pull/42538


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42538: [SPARK-44850][CONNECT] Heartbeat in scala's Spark Connect

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42538:
URL: https://github.com/apache/spark/pull/42538#discussion_r1297368891


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala:
##########
@@ -106,6 +108,110 @@ class SparkSessionE2ESuite extends RemoteSparkSession {
     finished = true
     assert(awaitResult(interruptor, 10.seconds))
     assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")
+
+    assert(spark.client.heartbeatLevel == 0)
+  }
+
+  test("Test heartbeat with simple operations") {
+    val n: Long = 10
+    val cnt = spark.range(0, n, 1, numPartitions = 1).count()
+    assert(cnt == n)
+
+    spark
+      .sql("select val from (values ('Hello'), ('World')) as t(val)", Map.empty[String, String])
+      .collect()
+
+    spark
+      .sql("select val from (values ('Hello'), ('World')) as t(val)", Array.empty)
+      .collect()
+
+    assert(spark.client.heartbeatLevel == 0)
+  }
+
+  test("Test heartbeat with long operation") {
+    val session = spark
+    import session.implicits._
+    implicit val ec: ExecutionContextExecutor = ExecutionContext.global
+
+    Future {
+      assertThrows[SparkException] {
+        spark
+          .range(10)
+          .map(n => {
+            Thread.sleep(30000);
+            n
+          })
+          .collect()
+      }
+    }
+
+    // wait until query execution kicks-off
+    eventually(timeout(10.seconds), interval(100.millis)) {
+      assert(spark.client.heartbeatLevel == 1)
+    }
+
+    eventually(timeout(10.seconds), interval(100.millis)) {
+      spark.interruptAll()
+      assert(spark.client.heartbeatLevel == 0)
+    }
+  }
+
+  test("Test heartbeat gets executed") {
+    // Importing here to avoid extra edge changes in imports section

Review Comment:
   Just did :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42538: [SPARK-44850][CONNECT] Heartbeat in scala's Spark Connect

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42538:
URL: https://github.com/apache/spark/pull/42538#discussion_r1297368070


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala:
##########


Review Comment:
   How much time do these tests add to the E2E suite?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44850][CONNECT] Heartbeat in scala's Spark Connect [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #42538:
URL: https://github.com/apache/spark/pull/42538#issuecomment-1826450643

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42538: [SPARK-44850][CONNECT] Heartbeat in scala's Spark Connect

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42538:
URL: https://github.com/apache/spark/pull/42538#discussion_r1297368407


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala:
##########
@@ -106,6 +108,110 @@ class SparkSessionE2ESuite extends RemoteSparkSession {
     finished = true
     assert(awaitResult(interruptor, 10.seconds))
     assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")
+
+    assert(spark.client.heartbeatLevel == 0)
+  }
+
+  test("Test heartbeat with simple operations") {
+    val n: Long = 10
+    val cnt = spark.range(0, n, 1, numPartitions = 1).count()
+    assert(cnt == n)
+
+    spark
+      .sql("select val from (values ('Hello'), ('World')) as t(val)", Map.empty[String, String])
+      .collect()
+
+    spark
+      .sql("select val from (values ('Hello'), ('World')) as t(val)", Array.empty)
+      .collect()
+
+    assert(spark.client.heartbeatLevel == 0)
+  }
+
+  test("Test heartbeat with long operation") {
+    val session = spark
+    import session.implicits._
+    implicit val ec: ExecutionContextExecutor = ExecutionContext.global
+
+    Future {
+      assertThrows[SparkException] {
+        spark
+          .range(10)
+          .map(n => {
+            Thread.sleep(30000);
+            n
+          })
+          .collect()
+      }
+    }
+
+    // wait until query execution kicks-off
+    eventually(timeout(10.seconds), interval(100.millis)) {
+      assert(spark.client.heartbeatLevel == 1)
+    }
+
+    eventually(timeout(10.seconds), interval(100.millis)) {
+      spark.interruptAll()
+      assert(spark.client.heartbeatLevel == 0)
+    }
+  }
+
+  test("Test heartbeat gets executed") {
+    // Importing here to avoid extra edge changes in imports section

Review Comment:
   Remove this? And just make it an import.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42538: [SPARK-44850][CONNECT] Heartbeat in scala's Spark Connect

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42538:
URL: https://github.com/apache/spark/pull/42538#discussion_r1297368407


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala:
##########
@@ -106,6 +108,110 @@ class SparkSessionE2ESuite extends RemoteSparkSession {
     finished = true
     assert(awaitResult(interruptor, 10.seconds))
     assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")
+
+    assert(spark.client.heartbeatLevel == 0)
+  }
+
+  test("Test heartbeat with simple operations") {
+    val n: Long = 10
+    val cnt = spark.range(0, n, 1, numPartitions = 1).count()
+    assert(cnt == n)
+
+    spark
+      .sql("select val from (values ('Hello'), ('World')) as t(val)", Map.empty[String, String])
+      .collect()
+
+    spark
+      .sql("select val from (values ('Hello'), ('World')) as t(val)", Array.empty)
+      .collect()
+
+    assert(spark.client.heartbeatLevel == 0)
+  }
+
+  test("Test heartbeat with long operation") {
+    val session = spark
+    import session.implicits._
+    implicit val ec: ExecutionContextExecutor = ExecutionContext.global
+
+    Future {
+      assertThrows[SparkException] {
+        spark
+          .range(10)
+          .map(n => {
+            Thread.sleep(30000);
+            n
+          })
+          .collect()
+      }
+    }
+
+    // wait until query execution kicks-off
+    eventually(timeout(10.seconds), interval(100.millis)) {
+      assert(spark.client.heartbeatLevel == 1)
+    }
+
+    eventually(timeout(10.seconds), interval(100.millis)) {
+      spark.interruptAll()
+      assert(spark.client.heartbeatLevel == 0)
+    }
+  }
+
+  test("Test heartbeat gets executed") {
+    // Importing here to avoid extra edge changes in imports section

Review Comment:
   Remove this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42538: [SPARK-44850][CONNECT] Heartbeat in scala's Spark Connect

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42538:
URL: https://github.com/apache/spark/pull/42538#discussion_r1297369752


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala:
##########


Review Comment:
   < 5 seconds



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org