You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/09/21 06:04:12 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #3506] Send credentials if needed before kyuubi operation run to prevent token expired issue
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 96faed7bd [KYUUBI #3506] Send credentials if needed before kyuubi operation run to prevent token expired issue
96faed7bd is described below
commit 96faed7bd8279e5ef03a185b8b638f601a54a3e2
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Wed Sep 21 14:04:02 2022 +0800
[KYUUBI #3506] Send credentials if needed before kyuubi operation run to prevent token expired issue
### _Why are the changes needed?_
We met token expired issue.
```
"org.apache.kyuubi.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: Error operating ExecuteStatement: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 57938358 for b_stf) can't be found in cache
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1907)
at org.apache.hadoop.ipc.Client.call(Client.java:1831)
at org.apache.hadoop.ipc.Client.call(Client.java:1719)
at org.apache.hadoop.ipc.Client.call(Client.java:1648)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:260)
at com.sun.proxy.$Proxy16.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:1184)
at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:306)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:207)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:189)
at com.sun.proxy.$Proxy17.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2184)
```
The engine share level is CONNECTION.
We keep the connection alive with KyuubiConnection::isValid, the max idle time is 3d.
There is no statements executed in last 24h and it caused the hdfs delegation expired.
Then I want to execute a statement and met above issue.
### _How was this patch tested?_
Pass existing UT.
Closes #3506 from turboFei/send_creds.
Closes #3506
cead06b56 [Fei Wang] before run
d26400e3b [Fei Wang] send credentials before
Authored-by: Fei Wang <fw...@ebay.com>
Signed-off-by: Fei Wang <fw...@ebay.com>
---
.../scala/org/apache/kyuubi/operation/ExecuteStatement.scala | 12 ++----------
.../scala/org/apache/kyuubi/operation/KyuubiOperation.scala | 12 +++++++++++-
2 files changed, 13 insertions(+), 11 deletions(-)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index bd7cfe041..10d41c922 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -27,7 +27,7 @@ import org.apache.kyuubi.events.{EventBus, KyuubiOperationEvent}
import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, Session}
+import org.apache.kyuubi.session.Session
class ExecuteStatement(
session: Session,
@@ -51,6 +51,7 @@ class ExecuteStatement(
OperationLog.setCurrentOperationLog(_operationLog)
setHasResultSet(true)
setState(OperationState.PENDING)
+ sendCredentialsIfNeeded()
}
override protected def afterRun(): Unit = {
@@ -127,15 +128,6 @@ class ExecuteStatement(
fetchQueryLog()
} catch onError()
- private def sendCredentialsIfNeeded(): Unit = {
- val appUser = session.asInstanceOf[KyuubiSessionImpl].engine.appUser
- val sessionManager = session.sessionManager.asInstanceOf[KyuubiSessionManager]
- sessionManager.credentialsManager.sendCredentialsIfNeeded(
- session.handle.identifier.toString,
- appUser,
- client.sendCredentials)
- }
-
private def fetchQueryLog(): Unit = {
getOperationLog.foreach { logger =>
try {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index 2d28c767e..bab139ff4 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -30,7 +30,7 @@ import org.apache.kyuubi.metrics.MetricsConstants.{OPERATION_FAIL, OPERATION_OPE
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationState.OperationState
-import org.apache.kyuubi.session.{KyuubiSessionImpl, Session}
+import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, Session}
import org.apache.kyuubi.util.ThriftUtils
abstract class KyuubiOperation(session: Session) extends AbstractOperation(session) {
@@ -84,6 +84,16 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
override protected def beforeRun(): Unit = {
setHasResultSet(true)
setState(OperationState.RUNNING)
+ sendCredentialsIfNeeded()
+ }
+
+ protected def sendCredentialsIfNeeded(): Unit = {
+ val appUser = session.asInstanceOf[KyuubiSessionImpl].engine.appUser
+ val sessionManager = session.sessionManager.asInstanceOf[KyuubiSessionManager]
+ sessionManager.credentialsManager.sendCredentialsIfNeeded(
+ session.handle.identifier.toString,
+ appUser,
+ client.sendCredentials)
}
override protected def afterRun(): Unit = {