You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/06/02 05:41:13 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2802] Retry opening the TSocket in KyuubiSyncThriftClient
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 21845266b [KYUUBI #2802] Retry opening the TSocket in KyuubiSyncThriftClient
21845266b is described below
commit 21845266b3ba80494226039c08758ec506656503
Author: Mahmoud Bahaa <ma...@incorta.com>
AuthorDate: Thu Jun 2 13:41:06 2022 +0800
[KYUUBI #2802] Retry opening the TSocket in KyuubiSyncThriftClient
### _Why are the changes needed?_
the new option: **kyuubi.operation.thrift.client.request.max.attempts** is not respected in createTProtocol in KyuubiSyncThriftClient thus it fails from the first time and doesn't retry even if **kyuubi.operation.thrift.client.request.max.attempts** was geater than 1. so adjusted the code to allow retry for that case. (My scala is kinda rusty so forgive if there is some uncleanness in the code)
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2802 from mahmoudbahaa/expand-thrift-max-attempt-usage.
Closes #2802
bab6e835c [Mahmoud Bahaa] Retry opening the TSocket in KyuubiSyncThriftClient using kyuubi.operation.thrift.client.request.max.attempts
Authored-by: Mahmoud Bahaa <ma...@incorta.com>
Signed-off-by: Fei Wang <fw...@ebay.com>
---
.../kyuubi/client/KyuubiSyncThriftClient.scala | 70 +++++++++++++++-------
1 file changed, 50 insertions(+), 20 deletions(-)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 917449dfb..1fe8bfb80 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -106,23 +106,14 @@ class KyuubiSyncThriftClient private (
}
private def withRetryingRequest[T](block: => T, request: String): T = withLockAcquired {
- var attemptCount = 1
-
- var resp: T = null.asInstanceOf[T]
- while (attemptCount <= maxAttempts && resp == null) {
- try {
- resp = block
- remoteEngineBroken = false
- } catch {
- case e: TException if attemptCount < maxAttempts && isConnectionValid() =>
- warn(s"Failed to execute $request after $attemptCount/$maxAttempts times, retrying", e)
- attemptCount += 1
- Thread.sleep(100)
- case e: Throwable =>
- error(s"Failed to execute $request after $attemptCount/$maxAttempts times, aborting", e)
- throw e
- }
- }
+ val (resp, shouldResetEngineBroken) = KyuubiSyncThriftClient.withRetryingRequestNoLock(
+ block,
+ request,
+ maxAttempts,
+ remoteEngineBroken,
+ isConnectionValid)
+
+ if (shouldResetEngineBroken) remoteEngineBroken = false
resp
}
@@ -386,7 +377,35 @@ class KyuubiSyncThriftClient private (
}
}
-private[kyuubi] object KyuubiSyncThriftClient {
+private[kyuubi] object KyuubiSyncThriftClient extends Logging {
+
+ private def withRetryingRequestNoLock[T](
+ block: => T,
+ request: String,
+ maxAttempts: Int,
+ remoteEngineBroken: Boolean,
+ isConnectionValid: () => Boolean): (T, Boolean) = {
+ var attemptCount = 1
+
+ var resp: T = null.asInstanceOf[T]
+ var shouldResetEngineBroken = false;
+ while (attemptCount <= maxAttempts && resp == null) {
+ try {
+ resp = block
+ shouldResetEngineBroken = true
+ } catch {
+ case e: TException if attemptCount < maxAttempts && isConnectionValid() =>
+ warn(s"Failed to execute $request after $attemptCount/$maxAttempts times, retrying", e)
+ attemptCount += 1
+ Thread.sleep(100)
+ case e: Throwable =>
+ error(s"Failed to execute $request after $attemptCount/$maxAttempts times, aborting", e)
+ throw e
+ }
+ }
+ (resp, shouldResetEngineBroken)
+ }
+
private def createTProtocol(
user: String,
passwd: String,
@@ -414,10 +433,21 @@ private[kyuubi] object KyuubiSyncThriftClient {
val aliveProbeInterval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL).toInt
val aliveTimeout = conf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT)
- val tProtocol = createTProtocol(user, passwd, host, port, requestTimeout, loginTimeout)
+ val (tProtocol, _) = withRetryingRequestNoLock(
+ createTProtocol(user, passwd, host, port, requestTimeout, loginTimeout),
+ "CreatingTProtocol",
+ requestMaxAttempts,
+ false,
+ () => true)
+
val aliveProbeProtocol =
if (aliveProbeEnabled) {
- Option(createTProtocol(user, passwd, host, port, aliveProbeInterval, loginTimeout))
+ Option(withRetryingRequestNoLock(
+ createTProtocol(user, passwd, host, port, aliveProbeInterval, loginTimeout),
+ "CreatingTProtocol",
+ requestMaxAttempts,
+ false,
+ () => true)._1)
} else {
None
}