You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/05/14 16:05:25 UTC

[kyuubi] branch branch-1.7 updated: [KYUUBI #4830][TEST] Fix flaky test "support to interrupt the thrift request if remote engine is broken"

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new daf62eaa7 [KYUUBI #4830][TEST] Fix flaky test "support to interrupt the thrift request if remote engine is broken"
daf62eaa7 is described below

commit daf62eaa706851734a5f18a1a981a7e26a468ed6
Author: wforget <64...@qq.com>
AuthorDate: Mon May 15 00:04:39 2023 +0800

    [KYUUBI #4830][TEST] Fix flaky test "support to interrupt the thrift request if remote engine is broken"
    
    ### _Why are the changes needed?_
    
    Move 'support to interrupt the thrift request if remote engine is broken' ut to `KyuubiOperationPerConnectionSuite` and fix it.
    
    close #4830
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4833 from wForget/KYUUBI-4830.
    
    Closes #4830
    
    838021175 [wforget] test
    214d22559 [wforget] fix
    c7b96f0e2 [wforget] fix
    fbf837ffd [wforget] fix
    e6ada6af9 [wforget] [KYUUBI #4830] Fix flaky test: KyuubiOperationPerUserSuite: max result rows
    
    Authored-by: wforget <64...@qq.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
    (cherry picked from commit 272673a41f9908a004fd9e59d57470f476f5369e)
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../KyuubiOperationPerConnectionSuite.scala        | 51 +++++++++++++++++++++-
 .../operation/KyuubiOperationPerUserSuite.scala    | 48 +-------------------
 2 files changed, 51 insertions(+), 48 deletions(-)

diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index 6c22664fe..83124a456 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -34,7 +34,7 @@ import org.apache.kyuubi.jdbc.KyuubiHiveDriver
 import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiSQLException}
 import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
 import org.apache.kyuubi.plugin.SessionConfAdvisor
-import org.apache.kyuubi.session.{KyuubiSessionManager, SessionType}
+import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, SessionHandle, SessionType}
 
 /**
  * UT with Connection level engine shared cost much time, only run basic jdbc tests.
@@ -290,6 +290,55 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
       assert(e.getMessage.contains("client should catch this exception"))
     }
   }
+
+  test("support to interrupt the thrift request if remote engine is broken") {
+    withSessionConf(Map(
+      KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
+      KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "1000",
+      KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "1000"))(Map.empty)(
+      Map.empty) {
+      withSessionHandle { (client, handle) =>
+        val preReq = new TExecuteStatementReq()
+        preReq.setStatement("select engine_name()")
+        preReq.setSessionHandle(handle)
+        preReq.setRunAsync(false)
+        client.ExecuteStatement(preReq)
+
+        val sessionHandle = SessionHandle(handle)
+        val session = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
+          .getSession(sessionHandle).asInstanceOf[KyuubiSessionImpl]
+
+        val exitReq = new TExecuteStatementReq()
+        exitReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 1000L)," +
+          "java_method('java.lang.System', 'exit', 1)")
+        exitReq.setSessionHandle(handle)
+        exitReq.setRunAsync(true)
+        client.ExecuteStatement(exitReq)
+
+        session.sessionManager.getConf
+          .set(KyuubiConf.OPERATION_STATUS_UPDATE_INTERVAL, 3000L)
+
+        val executeStmtReq = new TExecuteStatementReq()
+        executeStmtReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 30000l)")
+        executeStmtReq.setSessionHandle(handle)
+        executeStmtReq.setRunAsync(false)
+        val startTime = System.currentTimeMillis()
+        val executeStmtResp = client.ExecuteStatement(executeStmtReq)
+        assert(executeStmtResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
+        assert(executeStmtResp.getStatus.getErrorMessage.contains(
+          "java.net.SocketException") ||
+          executeStmtResp.getStatus.getErrorMessage.contains(
+            "org.apache.thrift.transport.TTransportException") ||
+          executeStmtResp.getStatus.getErrorMessage.contains(
+            "connection does not exist"))
+        val elapsedTime = System.currentTimeMillis() - startTime
+        assert(elapsedTime < 20 * 1000)
+        eventually(timeout(3.seconds)) {
+          assert(session.client.asyncRequestInterrupted)
+        }
+      }
+    }
+  }
 }
 
 class TestSessionConfAdvisor extends SessionConfAdvisor {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
index c9c5ec8ba..a1eeec031 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
@@ -29,7 +29,7 @@ import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
 import org.apache.kyuubi.engine.SemanticVersion
 import org.apache.kyuubi.jdbc.hive.KyuubiStatement
 import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
-import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, SessionHandle}
+import org.apache.kyuubi.session.{KyuubiSessionImpl, SessionHandle}
 import org.apache.kyuubi.zookeeper.ZookeeperConf
 
 class KyuubiOperationPerUserSuite
@@ -166,52 +166,6 @@ class KyuubiOperationPerUserSuite
     assert(r1 !== r2)
   }
 
-  test("support to interrupt the thrift request if remote engine is broken") {
-    assume(!httpMode)
-    withSessionConf(Map(
-      KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
-      KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "1000",
-      KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "1000"))(Map.empty)(
-      Map.empty) {
-      withSessionHandle { (client, handle) =>
-        val preReq = new TExecuteStatementReq()
-        preReq.setStatement("select engine_name()")
-        preReq.setSessionHandle(handle)
-        preReq.setRunAsync(false)
-        client.ExecuteStatement(preReq)
-
-        val sessionHandle = SessionHandle(handle)
-        val session = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
-          .getSession(sessionHandle).asInstanceOf[KyuubiSessionImpl]
-        session.client.getEngineAliveProbeProtocol.foreach(_.getTransport.close())
-
-        val exitReq = new TExecuteStatementReq()
-        exitReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 1000L)," +
-          "java_method('java.lang.System', 'exit', 1)")
-        exitReq.setSessionHandle(handle)
-        exitReq.setRunAsync(true)
-        client.ExecuteStatement(exitReq)
-
-        val executeStmtReq = new TExecuteStatementReq()
-        executeStmtReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 30000l)")
-        executeStmtReq.setSessionHandle(handle)
-        executeStmtReq.setRunAsync(false)
-        val startTime = System.currentTimeMillis()
-        val executeStmtResp = client.ExecuteStatement(executeStmtReq)
-        assert(executeStmtResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
-        assert(executeStmtResp.getStatus.getErrorMessage.contains(
-          "java.net.SocketException") ||
-          executeStmtResp.getStatus.getErrorMessage.contains(
-            "org.apache.thrift.transport.TTransportException") ||
-          executeStmtResp.getStatus.getErrorMessage.contains(
-            "connection does not exist"))
-        val elapsedTime = System.currentTimeMillis() - startTime
-        assert(elapsedTime < 20 * 1000)
-        assert(session.client.asyncRequestInterrupted)
-      }
-    }
-  }
-
   test("max result rows") {
     Seq("true", "false").foreach { incremental =>
       Seq("thrift", "arrow").foreach { resultFormat =>