You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "LuciferYang (via GitHub)" <gi...@apache.org> on 2023/07/24 07:07:11 UTC

[GitHub] [spark] LuciferYang commented on a diff in pull request #42009: [SPARK-44422][CONNECT] Spark Connect fine grained interrupt

LuciferYang commented on code in PR #42009:
URL: https://github.com/apache/spark/pull/42009#discussion_r1271836248


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala:
##########
@@ -96,5 +103,148 @@ class SparkSessionE2ESuite extends RemoteSparkSession {
     assert(e2.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e2")
     finished = true
     assert(ThreadUtils.awaitResult(interruptor, 10.seconds))
+    assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")
+  }
+
+  test("interrupt tag") {
+    val session = spark
+    import session.implicits._
+
+    // global ExecutionContext has only 2 threads in Apache Spark CI
+    // create own thread pool for four Futures used in this test
+    val numThreads = 4
+    val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thread-pool", numThreads)
+    val executionContext = ExecutionContext.fromExecutorService(fpool)
+
+    val q1 = Future {
+      assert(spark.getTags() == Set())
+      spark.addTag("two")
+      assert(spark.getTags() == Set("two"))
+      spark.clearTags() // check that clearing all tags works
+      assert(spark.getTags() == Set())
+      spark.addTag("one")
+      assert(spark.getTags() == Set("one"))
+      try {
+        spark
+          .range(10)
+          .map(n => {
+            Thread.sleep(30000); n
+          })
+          .collect()
+      } finally {
+        spark.clearTags() // clear for the case of thread reuse by another Future
+      }
+    }(executionContext)
+    val q2 = Future {
+      assert(spark.getTags() == Set())
+      spark.addTag("one")
+      spark.addTag("two")
+      spark.addTag("one")
+      spark.addTag("two") // duplicates shouldn't matter
+      assert(spark.getTags() == Set("one", "two"))
+      try {
+        spark
+          .range(10)
+          .map(n => {
+            Thread.sleep(30000); n
+          })
+          .collect()
+      } finally {
+        spark.clearTags() // clear for the case of thread reuse by another Future
+      }
+    }(executionContext)
+    val q3 = Future {
+      assert(spark.getTags() == Set())
+      spark.addTag("foo")
+      spark.removeTag("foo")
+      assert(spark.getTags() == Set()) // check that remove works removing the last tag
+      spark.addTag("two")
+      assert(spark.getTags() == Set("two"))
+      try {
+        spark
+          .range(10)
+          .map(n => {
+            Thread.sleep(30000); n
+          })
+          .collect()
+      } finally {
+        spark.clearTags() // clear for the case of thread reuse by another Future
+      }
+    }(executionContext)
+    val q4 = Future {
+      assert(spark.getTags() == Set())
+      spark.addTag("one")
+      spark.addTag("two")
+      spark.addTag("two")
+      assert(spark.getTags() == Set("one", "two"))
+      spark.removeTag("two") // check that remove works, despite duplicate add
+      assert(spark.getTags() == Set("one"))
+      try {
+        spark
+          .range(10)
+          .map(n => {
+            Thread.sleep(30000); n
+          })
+          .collect()
+      } finally {
+        spark.clearTags() // clear for the case of thread reuse by another Future
+      }
+    }(executionContext)
+    val interrupted = mutable.ListBuffer[String]()
+
+    // q2 and q3 should be cancelled
+    interrupted.clear()
+    eventually(timeout(20.seconds), interval(1.seconds)) {
+      val ids = spark.interruptTag("two")
+      interrupted ++= ids
+      assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")
+    }
+    val e2 = intercept[SparkException] {
+      ThreadUtils.awaitResult(q2, 1.minute)
+    }
+    assert(e2.getCause.getMessage contains "OPERATION_CANCELED")
+    val e3 = intercept[SparkException] {
+      ThreadUtils.awaitResult(q3, 1.minute)
+    }
+    assert(e3.getCause.getMessage contains "OPERATION_CANCELED")
+    assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")
+
+    // q1 and q4 should be cancelled
+    interrupted.clear()
+    eventually(timeout(20.seconds), interval(1.seconds)) {
+      val ids = spark.interruptTag("one")
+      interrupted ++= ids
+      assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")
+    }
+    val e1 = intercept[SparkException] {
+      ThreadUtils.awaitResult(q1, 1.minute)
+    }
+    assert(e1.getCause.getMessage contains "OPERATION_CANCELED")
+    val e4 = intercept[SparkException] {
+      ThreadUtils.awaitResult(q4, 1.minute)
+    }
+    assert(e4.getCause.getMessage contains "OPERATION_CANCELED")
+    assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")
+  }
+
+  test("interrupt operation") {

Review Comment:
   @juliuszsompolski after this pr, maven test `SparkSessionE2ESuite` start failed:
   
   ```
   build/mvn clean install -DskipTests -Phive  
   build/mvn clean test -pl connector/connect/client/jvm
   ``` 
   
   ```
   SparkSessionE2ESuite:
   - interrupt all - background queries, foreground interrupt *** FAILED ***
     The code passed to eventually never returned normally. Attempted 30 times over 20.274941708000004 seconds. Last failure message: Some("unexpected failure in q2: org.apache.spark.SparkException: org/apache/spark/sql/connect/client/SparkResult") was not empty Error not empty: Some(unexpected failure in q2: org.apache.spark.SparkException: org/apache/spark/sql/connect/client/SparkResult). (SparkSessionE2ESuite.scala:69)
   - interrupt all - foreground queries, background interrupt *** FAILED ***
     "org/apache/spark/sql/connect/client/SparkResult" did not contain "OPERATION_CANCELED" Unexpected exception: org.apache.spark.SparkException: org/apache/spark/sql/connect/client/SparkResult (SparkSessionE2ESuite.scala:99)
   - interrupt tag *** FAILED ***
     The code passed to eventually never returned normally. Attempted 30 times over 20.256318458000003 seconds. Last failure message: ListBuffer() had length 0 instead of expected length 2 Interrupted operations: ListBuffer().. (SparkSessionE2ESuite.scala:197)
   - interrupt operation *** FAILED ***
     org.apache.spark.SparkException: org/apache/spark/sql/connect/client/SparkResult
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
     at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:82)
     at org.apache.spark.sql.connect.client.SparkResult.operationId(SparkResult.scala:173)
     at org.apache.spark.sql.SparkSessionE2ESuite.$anonfun$new$31(SparkSessionE2ESuite.scala:241)
     at org.apache.spark.sql.connect.client.util.RemoteSparkSession.$anonfun$test$1(RemoteSparkSession.scala:235)
     at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
     at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
     at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
     ...
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org