You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/07/13 11:34:09 UTC

[incubator-kyuubi] branch master updated: [KYUUBI 770] Fix flaky test: Add config to control if cancel invoke interrupt task on engine (#780)

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 082b182  [KYUUBI 770] Fix flaky test: Add config to control if cancel invoke interrupt task on engine (#780)
082b182 is described below

commit 082b182757f4f5a6aa6a3936dc4beb5945afd964
Author: timothy65535 <86...@users.noreply.github.com>
AuthorDate: Tue Jul 13 19:34:03 2021 +0800

    [KYUUBI 770] Fix flaky test: Add config to control if cancel invoke interrupt task on engine (#780)
    
    * [KYUUBI 770] Fix flaky test: Add config to control if cancel invoke interrupt task on engine
---
 .../kyuubi/engine/spark/IndividualSparkSuite.scala | 33 ++++++++++++++--------
 1 file changed, 22 insertions(+), 11 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala
index 829eaf7..4fe2cf6 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/IndividualSparkSuite.scala
@@ -21,7 +21,7 @@ import java.sql.{SQLTimeoutException, Statement}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 
 import org.apache.spark.TaskKilled
-import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerTaskEnd}
 import org.apache.spark.sql.SparkSession
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
@@ -41,26 +41,37 @@ class SparkEngineSuites extends KyuubiFunSuite {
           val listener = new SparkListener {
             override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
               assert(taskEnd.reason.isInstanceOf[TaskKilled])
-              if (forceCancel.get()) {
-                assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 3000)
-                index.incrementAndGet()
-              } else {
-                assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime >= 4000)
-                index.incrementAndGet()
-              }
+              // When OPERATION_FORCE_CANCEL variable is true, and the task execution is cancelled,
+              // the following statement will be executed
+              index.incrementAndGet()
+            }
+
+            override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+              // Means the query statement is executed
+              index.incrementAndGet()
+            }
+
+            override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+              // Always be executed
+              index.incrementAndGet()
             }
           }
 
           spark.sparkContext.addSparkListener(listener)
           try {
-            statement.setQueryTimeout(3)
+            statement.setQueryTimeout(5)
             forceCancel.set(force)
             val e1 = intercept[SQLTimeoutException] {
-              statement.execute("select java_method('java.lang.Thread', 'sleep', 5000L)")
+              statement.execute("select java_method('java.lang.Thread', 'sleep', 500000L)")
             }.getMessage
             assert(e1.contains("Query timed out"))
+            assert(index.get() != 0, "The query statement was not executed.")
             eventually(Timeout(30.seconds)) {
-              assert(index.get() == 1)
+              if (forceCancel.get()) {
+                assert(index.get() == 3)
+              } else {
+                assert(index.get() == 2)
+              }
             }
           } finally {
             spark.sparkContext.removeSparkListener(listener)