You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/05/14 16:05:25 UTC
[kyuubi] branch branch-1.7 updated: [KYUUBI #4830][TEST] Fix flaky test "support to interrupt the thrift request if remote engine is broken"
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new daf62eaa7 [KYUUBI #4830][TEST] Fix flaky test "support to interrupt the thrift request if remote engine is broken"
daf62eaa7 is described below
commit daf62eaa706851734a5f18a1a981a7e26a468ed6
Author: wforget <64...@qq.com>
AuthorDate: Mon May 15 00:04:39 2023 +0800
[KYUUBI #4830][TEST] Fix flaky test "support to interrupt the thrift request if remote engine is broken"
### _Why are the changes needed?_
Move 'support to interrupt the thrift request if remote engine is broken' ut to `KyuubiOperationPerConnectionSuite` and fix it.
close #4830
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4833 from wForget/KYUUBI-4830.
Closes #4830
838021175 [wforget] test
214d22559 [wforget] fix
c7b96f0e2 [wforget] fix
fbf837ffd [wforget] fix
e6ada6af9 [wforget] [KYUUBI #4830] Fix flaky test: KyuubiOperationPerUserSuite: max result rows
Authored-by: wforget <64...@qq.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
(cherry picked from commit 272673a41f9908a004fd9e59d57470f476f5369e)
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../KyuubiOperationPerConnectionSuite.scala | 51 +++++++++++++++++++++-
.../operation/KyuubiOperationPerUserSuite.scala | 48 +-------------------
2 files changed, 51 insertions(+), 48 deletions(-)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index 6c22664fe..83124a456 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -34,7 +34,7 @@ import org.apache.kyuubi.jdbc.KyuubiHiveDriver
import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiSQLException}
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.plugin.SessionConfAdvisor
-import org.apache.kyuubi.session.{KyuubiSessionManager, SessionType}
+import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, SessionHandle, SessionType}
/**
* UT with Connection level engine shared cost much time, only run basic jdbc tests.
@@ -290,6 +290,55 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
assert(e.getMessage.contains("client should catch this exception"))
}
}
+
+ test("support to interrupt the thrift request if remote engine is broken") {
+ withSessionConf(Map(
+ KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
+ KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "1000",
+ KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "1000"))(Map.empty)(
+ Map.empty) {
+ withSessionHandle { (client, handle) =>
+ val preReq = new TExecuteStatementReq()
+ preReq.setStatement("select engine_name()")
+ preReq.setSessionHandle(handle)
+ preReq.setRunAsync(false)
+ client.ExecuteStatement(preReq)
+
+ val sessionHandle = SessionHandle(handle)
+ val session = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
+ .getSession(sessionHandle).asInstanceOf[KyuubiSessionImpl]
+
+ val exitReq = new TExecuteStatementReq()
+ exitReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 1000L)," +
+ "java_method('java.lang.System', 'exit', 1)")
+ exitReq.setSessionHandle(handle)
+ exitReq.setRunAsync(true)
+ client.ExecuteStatement(exitReq)
+
+ session.sessionManager.getConf
+ .set(KyuubiConf.OPERATION_STATUS_UPDATE_INTERVAL, 3000L)
+
+ val executeStmtReq = new TExecuteStatementReq()
+ executeStmtReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 30000l)")
+ executeStmtReq.setSessionHandle(handle)
+ executeStmtReq.setRunAsync(false)
+ val startTime = System.currentTimeMillis()
+ val executeStmtResp = client.ExecuteStatement(executeStmtReq)
+ assert(executeStmtResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
+ assert(executeStmtResp.getStatus.getErrorMessage.contains(
+ "java.net.SocketException") ||
+ executeStmtResp.getStatus.getErrorMessage.contains(
+ "org.apache.thrift.transport.TTransportException") ||
+ executeStmtResp.getStatus.getErrorMessage.contains(
+ "connection does not exist"))
+ val elapsedTime = System.currentTimeMillis() - startTime
+ assert(elapsedTime < 20 * 1000)
+ eventually(timeout(3.seconds)) {
+ assert(session.client.asyncRequestInterrupted)
+ }
+ }
+ }
+ }
}
class TestSessionConfAdvisor extends SessionConfAdvisor {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
index c9c5ec8ba..a1eeec031 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
@@ -29,7 +29,7 @@ import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
import org.apache.kyuubi.engine.SemanticVersion
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
-import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, SessionHandle}
+import org.apache.kyuubi.session.{KyuubiSessionImpl, SessionHandle}
import org.apache.kyuubi.zookeeper.ZookeeperConf
class KyuubiOperationPerUserSuite
@@ -166,52 +166,6 @@ class KyuubiOperationPerUserSuite
assert(r1 !== r2)
}
- test("support to interrupt the thrift request if remote engine is broken") {
- assume(!httpMode)
- withSessionConf(Map(
- KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
- KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "1000",
- KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "1000"))(Map.empty)(
- Map.empty) {
- withSessionHandle { (client, handle) =>
- val preReq = new TExecuteStatementReq()
- preReq.setStatement("select engine_name()")
- preReq.setSessionHandle(handle)
- preReq.setRunAsync(false)
- client.ExecuteStatement(preReq)
-
- val sessionHandle = SessionHandle(handle)
- val session = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
- .getSession(sessionHandle).asInstanceOf[KyuubiSessionImpl]
- session.client.getEngineAliveProbeProtocol.foreach(_.getTransport.close())
-
- val exitReq = new TExecuteStatementReq()
- exitReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 1000L)," +
- "java_method('java.lang.System', 'exit', 1)")
- exitReq.setSessionHandle(handle)
- exitReq.setRunAsync(true)
- client.ExecuteStatement(exitReq)
-
- val executeStmtReq = new TExecuteStatementReq()
- executeStmtReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 30000l)")
- executeStmtReq.setSessionHandle(handle)
- executeStmtReq.setRunAsync(false)
- val startTime = System.currentTimeMillis()
- val executeStmtResp = client.ExecuteStatement(executeStmtReq)
- assert(executeStmtResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
- assert(executeStmtResp.getStatus.getErrorMessage.contains(
- "java.net.SocketException") ||
- executeStmtResp.getStatus.getErrorMessage.contains(
- "org.apache.thrift.transport.TTransportException") ||
- executeStmtResp.getStatus.getErrorMessage.contains(
- "connection does not exist"))
- val elapsedTime = System.currentTimeMillis() - startTime
- assert(elapsedTime < 20 * 1000)
- assert(session.client.asyncRequestInterrupted)
- }
- }
- }
-
test("max result rows") {
Seq("true", "false").foreach { incremental =>
Seq("thrift", "arrow").foreach { resultFormat =>