You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/06/02 05:41:13 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2802] Retry opening the TSocket in KyuubiSyncThriftClient

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 21845266b [KYUUBI #2802] Retry opening the TSocket in KyuubiSyncThriftClient
21845266b is described below

commit 21845266b3ba80494226039c08758ec506656503
Author: Mahmoud Bahaa <ma...@incorta.com>
AuthorDate: Thu Jun 2 13:41:06 2022 +0800

    [KYUUBI #2802] Retry opening the TSocket in KyuubiSyncThriftClient
    
    ### _Why are the changes needed?_
    
    the new option: **kyuubi.operation.thrift.client.request.max.attempts** is not respected in createTProtocol in KyuubiSyncThriftClient thus it fails from the first time and doesn't retry even if **kyuubi.operation.thrift.client.request.max.attempts** was geater than 1. so adjusted the code to allow retry for that case. (My scala is kinda rusty so forgive if there is some uncleanness in the code)
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2802 from mahmoudbahaa/expand-thrift-max-attempt-usage.
    
    Closes #2802
    
    bab6e835c [Mahmoud Bahaa] Retry opening the TSocket in KyuubiSyncThriftClient using kyuubi.operation.thrift.client.request.max.attempts
    
    Authored-by: Mahmoud Bahaa <ma...@incorta.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 .../kyuubi/client/KyuubiSyncThriftClient.scala     | 70 +++++++++++++++-------
 1 file changed, 50 insertions(+), 20 deletions(-)

diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 917449dfb..1fe8bfb80 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -106,23 +106,14 @@ class KyuubiSyncThriftClient private (
   }
 
   private def withRetryingRequest[T](block: => T, request: String): T = withLockAcquired {
-    var attemptCount = 1
-
-    var resp: T = null.asInstanceOf[T]
-    while (attemptCount <= maxAttempts && resp == null) {
-      try {
-        resp = block
-        remoteEngineBroken = false
-      } catch {
-        case e: TException if attemptCount < maxAttempts && isConnectionValid() =>
-          warn(s"Failed to execute $request after $attemptCount/$maxAttempts times, retrying", e)
-          attemptCount += 1
-          Thread.sleep(100)
-        case e: Throwable =>
-          error(s"Failed to execute $request after $attemptCount/$maxAttempts times, aborting", e)
-          throw e
-      }
-    }
+    val (resp, shouldResetEngineBroken) = KyuubiSyncThriftClient.withRetryingRequestNoLock(
+      block,
+      request,
+      maxAttempts,
+      remoteEngineBroken,
+      isConnectionValid)
+
+    if (shouldResetEngineBroken) remoteEngineBroken = false
     resp
   }
 
@@ -386,7 +377,35 @@ class KyuubiSyncThriftClient private (
   }
 }
 
-private[kyuubi] object KyuubiSyncThriftClient {
+private[kyuubi] object KyuubiSyncThriftClient extends Logging {
+
+  private def withRetryingRequestNoLock[T](
+      block: => T,
+      request: String,
+      maxAttempts: Int,
+      remoteEngineBroken: Boolean,
+      isConnectionValid: () => Boolean): (T, Boolean) = {
+    var attemptCount = 1
+
+    var resp: T = null.asInstanceOf[T]
+    var shouldResetEngineBroken = false;
+    while (attemptCount <= maxAttempts && resp == null) {
+      try {
+        resp = block
+        shouldResetEngineBroken = true
+      } catch {
+        case e: TException if attemptCount < maxAttempts && isConnectionValid() =>
+          warn(s"Failed to execute $request after $attemptCount/$maxAttempts times, retrying", e)
+          attemptCount += 1
+          Thread.sleep(100)
+        case e: Throwable =>
+          error(s"Failed to execute $request after $attemptCount/$maxAttempts times, aborting", e)
+          throw e
+      }
+    }
+    (resp, shouldResetEngineBroken)
+  }
+
   private def createTProtocol(
       user: String,
       passwd: String,
@@ -414,10 +433,21 @@ private[kyuubi] object KyuubiSyncThriftClient {
     val aliveProbeInterval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL).toInt
     val aliveTimeout = conf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT)
 
-    val tProtocol = createTProtocol(user, passwd, host, port, requestTimeout, loginTimeout)
+    val (tProtocol, _) = withRetryingRequestNoLock(
+      createTProtocol(user, passwd, host, port, requestTimeout, loginTimeout),
+      "CreatingTProtocol",
+      requestMaxAttempts,
+      false,
+      () => true)
+
     val aliveProbeProtocol =
       if (aliveProbeEnabled) {
-        Option(createTProtocol(user, passwd, host, port, aliveProbeInterval, loginTimeout))
+        Option(withRetryingRequestNoLock(
+          createTProtocol(user, passwd, host, port, aliveProbeInterval, loginTimeout),
+          "CreatingTProtocol",
+          requestMaxAttempts,
+          false,
+          () => true)._1)
       } else {
         None
       }