You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/09/21 06:04:12 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3506] Send credentials if needed before kyuubi operation run to prevent token expired issue

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 96faed7bd [KYUUBI #3506] Send credentials if needed before kyuubi operation run to prevent token expired issue
96faed7bd is described below

commit 96faed7bd8279e5ef03a185b8b638f601a54a3e2
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Wed Sep 21 14:04:02 2022 +0800

    [KYUUBI #3506] Send credentials if needed before kyuubi operation run to prevent token expired issue
    
    ### _Why are the changes needed?_
    
    We met token expired issue.
    
    ```
    "org.apache.kyuubi.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: Error operating ExecuteStatement: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 57938358 for b_stf) can't be found in cache
    at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1907)
    at org.apache.hadoop.ipc.Client.call(Client.java:1831)
    at org.apache.hadoop.ipc.Client.call(Client.java:1719)
    at org.apache.hadoop.ipc.Client.call(Client.java:1648)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:260)
    at com.sun.proxy.$Proxy16.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:1184)
    at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:306)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:207)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:189)
    at com.sun.proxy.$Proxy17.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2184)
    ```
    
    The  engine share level is CONNECTION.
    
    We keep the connection alive with KyuubiConnection::isValid, the max idle time is 3d.
    
    There is no statements executed in last 24h and it  caused the hdfs delegation expired.
    
    Then I want to execute a statement and met above issue.
    
    ### _How was this patch tested?_
    Pass existing UT.
    
    Closes #3506 from turboFei/send_creds.
    
    Closes #3506
    
    cead06b56 [Fei Wang] before run
    d26400e3b [Fei Wang] send credentials before
    
    Authored-by: Fei Wang <fw...@ebay.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 .../scala/org/apache/kyuubi/operation/ExecuteStatement.scala | 12 ++----------
 .../scala/org/apache/kyuubi/operation/KyuubiOperation.scala  | 12 +++++++++++-
 2 files changed, 13 insertions(+), 11 deletions(-)

diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index bd7cfe041..10d41c922 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -27,7 +27,7 @@ import org.apache.kyuubi.events.{EventBus, KyuubiOperationEvent}
 import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
 import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, Session}
+import org.apache.kyuubi.session.Session
 
 class ExecuteStatement(
     session: Session,
@@ -51,6 +51,7 @@ class ExecuteStatement(
     OperationLog.setCurrentOperationLog(_operationLog)
     setHasResultSet(true)
     setState(OperationState.PENDING)
+    sendCredentialsIfNeeded()
   }
 
   override protected def afterRun(): Unit = {
@@ -127,15 +128,6 @@ class ExecuteStatement(
       fetchQueryLog()
     } catch onError()
 
-  private def sendCredentialsIfNeeded(): Unit = {
-    val appUser = session.asInstanceOf[KyuubiSessionImpl].engine.appUser
-    val sessionManager = session.sessionManager.asInstanceOf[KyuubiSessionManager]
-    sessionManager.credentialsManager.sendCredentialsIfNeeded(
-      session.handle.identifier.toString,
-      appUser,
-      client.sendCredentials)
-  }
-
   private def fetchQueryLog(): Unit = {
     getOperationLog.foreach { logger =>
       try {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index 2d28c767e..bab139ff4 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -30,7 +30,7 @@ import org.apache.kyuubi.metrics.MetricsConstants.{OPERATION_FAIL, OPERATION_OPE
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.operation.OperationState.OperationState
-import org.apache.kyuubi.session.{KyuubiSessionImpl, Session}
+import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, Session}
 import org.apache.kyuubi.util.ThriftUtils
 
 abstract class KyuubiOperation(session: Session) extends AbstractOperation(session) {
@@ -84,6 +84,16 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
   override protected def beforeRun(): Unit = {
     setHasResultSet(true)
     setState(OperationState.RUNNING)
+    sendCredentialsIfNeeded()
+  }
+
+  protected def sendCredentialsIfNeeded(): Unit = {
+    val appUser = session.asInstanceOf[KyuubiSessionImpl].engine.appUser
+    val sessionManager = session.sessionManager.asInstanceOf[KyuubiSessionManager]
+    sessionManager.credentialsManager.sendCredentialsIfNeeded(
+      session.handle.identifier.toString,
+      appUser,
+      client.sendCredentials)
   }
 
   override protected def afterRun(): Unit = {