You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/07/13 11:34:09 UTC
[incubator-kyuubi] branch master updated: [KYUUBI 770] Fix flaky
test: Add config to control if cancel invoke interrupt task on engine
(#780)
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 082b182 [KYUUBI 770] Fix flaky test: Add config to control if cancel invoke interrupt task on engine (#780)
082b182 is described below
commit 082b182757f4f5a6aa6a3936dc4beb5945afd964
Author: timothy65535 <86...@users.noreply.github.com>
AuthorDate: Tue Jul 13 19:34:03 2021 +0800
[KYUUBI 770] Fix flaky test: Add config to control if cancel invoke interrupt task on engine (#780)
* [KYUUBI 770] Fix flaky test: Add config to control if cancel invoke interrupt task on engine
---
.../kyuubi/engine/spark/IndividualSparkSuite.scala | 33 ++++++++++++++--------
1 file changed, 22 insertions(+), 11 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala
index 829eaf7..4fe2cf6 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala
@@ -21,7 +21,7 @@ import java.sql.{SQLTimeoutException, Statement}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import org.apache.spark.TaskKilled
-import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerTaskEnd}
import org.apache.spark.sql.SparkSession
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
@@ -41,26 +41,37 @@ class SparkEngineSuites extends KyuubiFunSuite {
val listener = new SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
assert(taskEnd.reason.isInstanceOf[TaskKilled])
- if (forceCancel.get()) {
- assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 3000)
- index.incrementAndGet()
- } else {
- assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime >= 4000)
- index.incrementAndGet()
- }
+ // When OPERATION_FORCE_CANCEL variable is true, and the task execution is cancelled,
+ // the following statement will be executed
+ index.incrementAndGet()
+ }
+
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ // Means the query statement is executed
+ index.incrementAndGet()
+ }
+
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+ // Always be executed
+ index.incrementAndGet()
}
}
spark.sparkContext.addSparkListener(listener)
try {
- statement.setQueryTimeout(3)
+ statement.setQueryTimeout(5)
forceCancel.set(force)
val e1 = intercept[SQLTimeoutException] {
- statement.execute("select java_method('java.lang.Thread', 'sleep', 5000L)")
+ statement.execute("select java_method('java.lang.Thread', 'sleep', 500000L)")
}.getMessage
assert(e1.contains("Query timed out"))
+ assert(index.get() != 0, "The query statement was not executed.")
eventually(Timeout(30.seconds)) {
- assert(index.get() == 1)
+ if (forceCancel.get()) {
+ assert(index.get() == 3)
+ } else {
+ assert(index.get() == 2)
+ }
}
} finally {
spark.sparkContext.removeSparkListener(listener)