You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/04/27 07:28:08 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2020] [Subtask] Hive Backend Engine - new APIs with hive-service-rpc 3.1.2 - TGetQueryId
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new b41be9ebe [KYUUBI #2020] [Subtask] Hive Backend Engine - new APIs with hive-service-rpc 3.1.2 - TGetQueryId
b41be9ebe is described below
commit b41be9ebecabcc3a573cb6a12a97e3f833b76ba3
Author: yangrong688 <ya...@gmail.com>
AuthorDate: Wed Apr 27 15:27:57 2022 +0800
[KYUUBI #2020] [Subtask] Hive Backend Engine - new APIs with hive-service-rpc 3.1.2 - TGetQueryId
### _Why are the changes needed?_
Hive Backend Engine - new APIs with hive-service-rpc 3.1.2 - TGetQueryId
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2382 from yangrong688/KYUUBI-2020.
Closes #2020
97eccc07 [yangrong688] fix spotless check
efe01165 [yangrong688] add javadoc and add returns null if stmtHandle is null
a9d04e6c [yangrong688] fix
ac0dff2f [yangrong688] fix
3e935802 [yangrong688] skip test when java version beyond JAVA_1_8
2a777e26 [yangrong688] update getQueryId test case
ec12d2a2 [yangrong688] [feat] complete TGetQueryId op, should refined test case later
2996579f [yangrong] [feat] init operation getQueryId, just structure, need to improve quickly
Lead-authored-by: yangrong688 <ya...@gmail.com>
Co-authored-by: yangrong <ya...@gmail.com>
Signed-off-by: Kent Yao <ya...@apache.org>
---
.../flink/operation/FlinkSQLOperationManager.scala | 4 +++
.../hive/operation/HiveOperationManager.scala | 7 ++++
.../engine/hive/operation/HiveOperationSuite.scala | 14 ++++++++
.../spark/operation/SparkSQLOperationManager.scala | 4 +++
.../trino/operation/TrinoOperationManager.scala | 4 +++
.../apache/kyuubi/operation/OperationManager.scala | 1 +
.../kyuubi/service/AbstractBackendService.scala | 6 ++++
.../org/apache/kyuubi/service/BackendService.scala | 1 +
.../apache/kyuubi/service/TFrontendService.scala | 2 ++
.../apache/kyuubi/session/AbstractSession.scala | 6 ++++
.../scala/org/apache/kyuubi/session/Session.scala | 1 +
.../kyuubi/operation/NoopOperationManager.scala | 5 +++
.../kyuubi/service/TFrontendServiceSuite.scala | 14 ++++++++
.../apache/kyuubi/jdbc/hive/KyuubiStatement.java | 41 ++++++++++++++--------
.../kyuubi/client/KyuubiSyncThriftClient.scala | 6 ++++
.../kyuubi/operation/KyuubiOperationManager.scala | 12 +++++++
16 files changed, 114 insertions(+), 14 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index d2fb52a2b..9a98fe1b7 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -144,4 +144,8 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
foreignTable: String): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
+
+ override def getQueryId(operation: Operation): String = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
}
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
index 12171a2f2..eb46cb8bc 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
@@ -22,6 +22,7 @@ import java.util.List
import scala.collection.JavaConverters._
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
import org.apache.hive.service.cli.{RowSetFactory, TableSchema}
import org.apache.hive.service.rpc.thrift.TRowSet
@@ -161,4 +162,10 @@ class HiveOperationManager() extends OperationManager("HiveOperationManager") {
rowSet.toTRowSet
}
+
+ override def getQueryId(operation: Operation): String = {
+ val hiveOperation = operation.asInstanceOf[HiveOperation]
+ val internalHiveOperation = hiveOperation.internalHiveOperation
+ internalHiveOperation.getParentSession.getHiveConf.getVar(ConfVars.HIVEQUERYID)
+ }
}
diff --git a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
index 36b6482d6..3595b35c2 100644
--- a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
+++ b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
@@ -17,8 +17,11 @@
package org.apache.kyuubi.engine.hive.operation
+import org.apache.commons.lang3.{JavaVersion, SystemUtils}
+
import org.apache.kyuubi.{HiveEngineTests, Utils}
import org.apache.kyuubi.engine.hive.HiveSQLEngine
+import org.apache.kyuubi.jdbc.hive.KyuubiStatement
class HiveOperationSuite extends HiveEngineTests {
@@ -35,4 +38,15 @@ class HiveOperationSuite extends HiveEngineTests {
override protected def jdbcUrl: String = {
"jdbc:hive2://" + HiveSQLEngine.currentEngine.get.frontendServices.head.connectionUrl + "/;"
}
+
+ test("test get query id") {
+ assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8))
+ withJdbcStatement("hive_engine_test") { statement =>
+ statement.execute("CREATE TABLE hive_engine_test(id int, value string) stored as orc")
+ statement.execute("INSERT INTO hive_engine_test SELECT 1, '2'")
+ statement.executeQuery("SELECT ID, VALUE FROM hive_engine_test")
+ val kyuubiStatement = statement.asInstanceOf[KyuubiStatement]
+ assert(kyuubiStatement.getQueryId != null)
+ }
+ }
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index e35dbbc1a..bc7b84390 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -151,4 +151,8 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
foreignTable: String): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
+
+ override def getQueryId(operation: Operation): String = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
}
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
index 0ddf7ad08..8c0e195f2 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
@@ -116,4 +116,8 @@ class TrinoOperationManager extends OperationManager("TrinoOperationManager") {
foreignTable: String): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
+
+ override def getQueryId(operation: Operation): String = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
index 503487267..e8c107fd0 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
@@ -85,6 +85,7 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): Operation
+ def getQueryId(operation: Operation): String
final def addOperation(operation: Operation): Operation = synchronized {
handleToOperation.put(operation.getHandle, operation)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
index b24123ab3..f019a4e44 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
@@ -150,6 +150,12 @@ abstract class AbstractBackendService(name: String)
foreignTable)
}
+ override def getQueryId(operationHandle: OperationHandle): String = {
+ val operation = sessionManager.operationManager.getOperation(operationHandle)
+ val queryId = sessionManager.operationManager.getQueryId(operation)
+ queryId
+ }
+
override def getOperationStatus(operationHandle: OperationHandle): OperationStatus = {
val operation = sessionManager.operationManager.getOperation(operationHandle)
if (operation.shouldRunAsync) {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
index 638b7e66b..1fb2f74a6 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
@@ -89,6 +89,7 @@ trait BackendService {
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): OperationHandle
+ def getQueryId(operationHandle: OperationHandle): String
def getOperationStatus(operationHandle: OperationHandle): OperationStatus
def cancelOperation(operationHandle: OperationHandle): Unit
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
index 7789ffb0a..d38c9bc81 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
@@ -529,6 +529,8 @@ abstract class TFrontendService(name: String)
override def GetQueryId(req: TGetQueryIdReq): TGetQueryIdResp = {
debug(req.toString)
val resp = new TGetQueryIdResp
+ val queryId = be.getQueryId(OperationHandle(req.getOperationHandle))
+ resp.setQueryId(queryId)
resp
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index 750fadbfc..270606b92 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -200,6 +200,12 @@ abstract class AbstractSession(
runOperation(operation)
}
+ override def getQueryId(operationHandle: OperationHandle): String = {
+ val operation = sessionManager.operationManager.getOperation(operationHandle)
+ val queryId = sessionManager.operationManager.getQueryId(operation)
+ queryId
+ }
+
override def cancelOperation(operationHandle: OperationHandle): Unit = withAcquireRelease() {
sessionManager.operationManager.cancelOperation(operationHandle)
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
index f9b8c2255..7e283b670 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
@@ -80,6 +80,7 @@ trait Session {
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): OperationHandle
+ def getQueryId(operationHandle: OperationHandle): String
def cancelOperation(operationHandle: OperationHandle): Unit
def closeOperation(operationHandle: OperationHandle): Unit
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
index ccd384466..4f0cfaf13 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
@@ -125,4 +125,9 @@ class NoopOperationManager extends OperationManager("noop") {
tRow.addToColumns(tColumn)
tRow
}
+
+ override def getQueryId(operation: Operation): String = {
+ val queryId = "noop_query_id"
+ queryId
+ }
}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
index 1f7ec181e..d092bd0ad 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
@@ -356,6 +356,20 @@ class TFrontendServiceSuite extends KyuubiFunSuite {
}
}
+ test("get query id") {
+ withSessionHandle { (client, handle) =>
+ val req = new TExecuteStatementReq()
+ req.setStatement("select 1")
+ req.setSessionHandle(handle)
+ req.setRunAsync(false)
+ val resp = client.ExecuteStatement(req)
+ val opHandle = resp.getOperationHandle
+ val req1 = new TGetQueryIdReq(opHandle)
+ val resp1 = client.GetQueryId(req1)
+ assert(resp1.getQueryId === "noop_query_id")
+ }
+ }
+
test("get operation status") {
withSessionHandle { (client, handle) =>
val opHandle =
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java
index b87f67ffc..48a255cc7 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java
@@ -17,6 +17,7 @@
package org.apache.kyuubi.jdbc.hive;
+import com.google.common.annotations.VisibleForTesting;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -24,22 +25,10 @@ import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLTimeoutException;
import java.sql.SQLWarning;
import java.util.*;
+import org.apache.commons.lang.StringUtils;
import org.apache.hive.service.cli.RowSet;
import org.apache.hive.service.cli.RowSetFactory;
-import org.apache.hive.service.rpc.thrift.TCLIService;
-import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
-import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
-import org.apache.hive.service.rpc.thrift.TCloseOperationReq;
-import org.apache.hive.service.rpc.thrift.TCloseOperationResp;
-import org.apache.hive.service.rpc.thrift.TExecuteStatementReq;
-import org.apache.hive.service.rpc.thrift.TExecuteStatementResp;
-import org.apache.hive.service.rpc.thrift.TFetchOrientation;
-import org.apache.hive.service.rpc.thrift.TFetchResultsReq;
-import org.apache.hive.service.rpc.thrift.TFetchResultsResp;
-import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
-import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
-import org.apache.hive.service.rpc.thrift.TOperationHandle;
-import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.apache.hive.service.rpc.thrift.*;
import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream;
import org.apache.kyuubi.jdbc.hive.logs.KyuubiLoggable;
import org.apache.thrift.TException;
@@ -989,6 +978,30 @@ public class KyuubiStatement implements java.sql.Statement, KyuubiLoggable {
return null;
}
+ /**
+ * Returns the Query ID if it is running. This method is a public API for usage outside of Hive,
+ * although it is not part of the interface java.sql.Statement.
+ *
+ * @return Valid query ID if it is running else returns NULL.
+ * @throws SQLException If any internal failures.
+ */
+ @VisibleForTesting
+ public String getQueryId() throws SQLException {
+ if (stmtHandle == null) {
+ // If query is not running or already closed.
+ return null;
+ }
+
+ try {
+ final String queryId = client.GetQueryId(new TGetQueryIdReq(stmtHandle)).getQueryId();
+
+ // queryId can be empty string if query was already closed. Need to return null in such case.
+ return StringUtils.isBlank(queryId) ? null : queryId;
+ } catch (TException e) {
+ throw new SQLException(e);
+ }
+ }
+
/**
* This is only used by the beeline client to set the stream on which in place progress updates
* are to be shown
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 4202bea5f..419a48249 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -310,6 +310,12 @@ class KyuubiSyncThriftClient private (
resp.getOperationHandle
}
+ def getQueryId(operationHandle: TOperationHandle): TGetQueryIdResp = {
+ val req = new TGetQueryIdReq(operationHandle)
+ val resp = withRetryingRequest(GetQueryId(req), "GetQueryId")
+ resp
+ }
+
def getOperationStatus(operationHandle: TOperationHandle): TGetOperationStatusResp = {
val req = new TGetOperationStatusReq(operationHandle)
val resp = withRetryingRequest(GetOperationStatus(req), "GetOperationStatus")
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index e7a7d3e6c..69ebbb584 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -151,6 +151,18 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
addOperation(operation)
}
+ override def getQueryId(operation: Operation): String = {
+ val kyuubiOperation = operation.asInstanceOf[KyuubiOperation]
+ val client = kyuubiOperation.client
+ val remoteHandle = kyuubiOperation.remoteOpHandle()
+ if (remoteHandle != null) {
+ val queryId = client.getQueryId(remoteHandle).getQueryId
+ queryId
+ } else {
+ null
+ }
+ }
+
def newLaunchEngineOperation(session: KyuubiSessionImpl, shouldRunAsync: Boolean): Operation = {
val operation = new LaunchEngine(session, shouldRunAsync)
addOperation(operation)