You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/04/27 07:28:08 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #2020] [Subtask] Hive Backend Engine - new APIs with hive-service-rpc 3.1.2 - TGetQueryId

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new b41be9ebe [KYUUBI #2020] [Subtask] Hive Backend Engine - new APIs with hive-service-rpc 3.1.2 - TGetQueryId
b41be9ebe is described below

commit b41be9ebecabcc3a573cb6a12a97e3f833b76ba3
Author: yangrong688 <ya...@gmail.com>
AuthorDate: Wed Apr 27 15:27:57 2022 +0800

    [KYUUBI #2020] [Subtask] Hive Backend Engine - new APIs with hive-service-rpc 3.1.2 - TGetQueryId
    
    ### _Why are the changes needed?_
    
    Hive Backend Engine - new APIs with hive-service-rpc 3.1.2 - TGetQueryId
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2382 from yangrong688/KYUUBI-2020.
    
    Closes #2020
    
    97eccc07 [yangrong688] fix spotless check
    efe01165 [yangrong688] add javadoc and add returns null if stmtHandle is null
    a9d04e6c [yangrong688] fix
    ac0dff2f [yangrong688] fix
    3e935802 [yangrong688] skip test when java version beyond JAVA_1_8
    2a777e26 [yangrong688] update getQueryId test case
    ec12d2a2 [yangrong688] [feat] complete TGetQueryId op, should refined test case later
    2996579f [yangrong] [feat] init operation getQueryId, just structure, need to improve quickly
    
    Lead-authored-by: yangrong688 <ya...@gmail.com>
    Co-authored-by: yangrong <ya...@gmail.com>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../flink/operation/FlinkSQLOperationManager.scala |  4 +++
 .../hive/operation/HiveOperationManager.scala      |  7 ++++
 .../engine/hive/operation/HiveOperationSuite.scala | 14 ++++++++
 .../spark/operation/SparkSQLOperationManager.scala |  4 +++
 .../trino/operation/TrinoOperationManager.scala    |  4 +++
 .../apache/kyuubi/operation/OperationManager.scala |  1 +
 .../kyuubi/service/AbstractBackendService.scala    |  6 ++++
 .../org/apache/kyuubi/service/BackendService.scala |  1 +
 .../apache/kyuubi/service/TFrontendService.scala   |  2 ++
 .../apache/kyuubi/session/AbstractSession.scala    |  6 ++++
 .../scala/org/apache/kyuubi/session/Session.scala  |  1 +
 .../kyuubi/operation/NoopOperationManager.scala    |  5 +++
 .../kyuubi/service/TFrontendServiceSuite.scala     | 14 ++++++++
 .../apache/kyuubi/jdbc/hive/KyuubiStatement.java   | 41 ++++++++++++++--------
 .../kyuubi/client/KyuubiSyncThriftClient.scala     |  6 ++++
 .../kyuubi/operation/KyuubiOperationManager.scala  | 12 +++++++
 16 files changed, 114 insertions(+), 14 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index d2fb52a2b..9a98fe1b7 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -144,4 +144,8 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
       foreignTable: String): Operation = {
     throw KyuubiSQLException.featureNotSupported()
   }
+
+  override def getQueryId(operation: Operation): String = {
+    throw KyuubiSQLException.featureNotSupported()
+  }
 }
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
index 12171a2f2..eb46cb8bc 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
@@ -22,6 +22,7 @@ import java.util.List
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
 import org.apache.hive.service.cli.{RowSetFactory, TableSchema}
 import org.apache.hive.service.rpc.thrift.TRowSet
@@ -161,4 +162,10 @@ class HiveOperationManager() extends OperationManager("HiveOperationManager") {
 
     rowSet.toTRowSet
   }
+
+  override def getQueryId(operation: Operation): String = {
+    val hiveOperation = operation.asInstanceOf[HiveOperation]
+    val internalHiveOperation = hiveOperation.internalHiveOperation
+    internalHiveOperation.getParentSession.getHiveConf.getVar(ConfVars.HIVEQUERYID)
+  }
 }
diff --git a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
index 36b6482d6..3595b35c2 100644
--- a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
+++ b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
@@ -17,8 +17,11 @@
 
 package org.apache.kyuubi.engine.hive.operation
 
+import org.apache.commons.lang3.{JavaVersion, SystemUtils}
+
 import org.apache.kyuubi.{HiveEngineTests, Utils}
 import org.apache.kyuubi.engine.hive.HiveSQLEngine
+import org.apache.kyuubi.jdbc.hive.KyuubiStatement
 
 class HiveOperationSuite extends HiveEngineTests {
 
@@ -35,4 +38,15 @@ class HiveOperationSuite extends HiveEngineTests {
   override protected def jdbcUrl: String = {
     "jdbc:hive2://" + HiveSQLEngine.currentEngine.get.frontendServices.head.connectionUrl + "/;"
   }
+
+  test("test get query id") {
+    assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8))
+    withJdbcStatement("hive_engine_test") { statement =>
+      statement.execute("CREATE TABLE hive_engine_test(id int, value string) stored as orc")
+      statement.execute("INSERT INTO hive_engine_test SELECT 1, '2'")
+      statement.executeQuery("SELECT ID, VALUE FROM hive_engine_test")
+      val kyuubiStatement = statement.asInstanceOf[KyuubiStatement]
+      assert(kyuubiStatement.getQueryId != null)
+    }
+  }
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index e35dbbc1a..bc7b84390 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -151,4 +151,8 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
       foreignTable: String): Operation = {
     throw KyuubiSQLException.featureNotSupported()
   }
+
+  override def getQueryId(operation: Operation): String = {
+    throw KyuubiSQLException.featureNotSupported()
+  }
 }
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
index 0ddf7ad08..8c0e195f2 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
@@ -116,4 +116,8 @@ class TrinoOperationManager extends OperationManager("TrinoOperationManager") {
       foreignTable: String): Operation = {
     throw KyuubiSQLException.featureNotSupported()
   }
+
+  override def getQueryId(operation: Operation): String = {
+    throw KyuubiSQLException.featureNotSupported()
+  }
 }
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
index 503487267..e8c107fd0 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
@@ -85,6 +85,7 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
       foreignCatalog: String,
       foreignSchema: String,
       foreignTable: String): Operation
+  def getQueryId(operation: Operation): String
 
   final def addOperation(operation: Operation): Operation = synchronized {
     handleToOperation.put(operation.getHandle, operation)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
index b24123ab3..f019a4e44 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
@@ -150,6 +150,12 @@ abstract class AbstractBackendService(name: String)
         foreignTable)
   }
 
+  override def getQueryId(operationHandle: OperationHandle): String = {
+    val operation = sessionManager.operationManager.getOperation(operationHandle)
+    val queryId = sessionManager.operationManager.getQueryId(operation)
+    queryId
+  }
+
   override def getOperationStatus(operationHandle: OperationHandle): OperationStatus = {
     val operation = sessionManager.operationManager.getOperation(operationHandle)
     if (operation.shouldRunAsync) {
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
index 638b7e66b..1fb2f74a6 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
@@ -89,6 +89,7 @@ trait BackendService {
       foreignCatalog: String,
       foreignSchema: String,
       foreignTable: String): OperationHandle
+  def getQueryId(operationHandle: OperationHandle): String
 
   def getOperationStatus(operationHandle: OperationHandle): OperationStatus
   def cancelOperation(operationHandle: OperationHandle): Unit
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
index 7789ffb0a..d38c9bc81 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
@@ -529,6 +529,8 @@ abstract class TFrontendService(name: String)
   override def GetQueryId(req: TGetQueryIdReq): TGetQueryIdResp = {
     debug(req.toString)
     val resp = new TGetQueryIdResp
+    val queryId = be.getQueryId(OperationHandle(req.getOperationHandle))
+    resp.setQueryId(queryId)
     resp
   }
 
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index 750fadbfc..270606b92 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -200,6 +200,12 @@ abstract class AbstractSession(
     runOperation(operation)
   }
 
+  override def getQueryId(operationHandle: OperationHandle): String = {
+    val operation = sessionManager.operationManager.getOperation(operationHandle)
+    val queryId = sessionManager.operationManager.getQueryId(operation)
+    queryId
+  }
+
   override def cancelOperation(operationHandle: OperationHandle): Unit = withAcquireRelease() {
     sessionManager.operationManager.cancelOperation(operationHandle)
   }
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
index f9b8c2255..7e283b670 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
@@ -80,6 +80,7 @@ trait Session {
       foreignCatalog: String,
       foreignSchema: String,
       foreignTable: String): OperationHandle
+  def getQueryId(operationHandle: OperationHandle): String
 
   def cancelOperation(operationHandle: OperationHandle): Unit
   def closeOperation(operationHandle: OperationHandle): Unit
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
index ccd384466..4f0cfaf13 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
@@ -125,4 +125,9 @@ class NoopOperationManager extends OperationManager("noop") {
     tRow.addToColumns(tColumn)
     tRow
   }
+
+  override def getQueryId(operation: Operation): String = {
+    val queryId = "noop_query_id"
+    queryId
+  }
 }
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
index 1f7ec181e..d092bd0ad 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
@@ -356,6 +356,20 @@ class TFrontendServiceSuite extends KyuubiFunSuite {
     }
   }
 
+  test("get query id") {
+    withSessionHandle { (client, handle) =>
+      val req = new TExecuteStatementReq()
+      req.setStatement("select 1")
+      req.setSessionHandle(handle)
+      req.setRunAsync(false)
+      val resp = client.ExecuteStatement(req)
+      val opHandle = resp.getOperationHandle
+      val req1 = new TGetQueryIdReq(opHandle)
+      val resp1 = client.GetQueryId(req1)
+      assert(resp1.getQueryId === "noop_query_id")
+    }
+  }
+
   test("get operation status") {
     withSessionHandle { (client, handle) =>
       val opHandle =
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java
index b87f67ffc..48a255cc7 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java
@@ -17,6 +17,7 @@
 
 package org.apache.kyuubi.jdbc.hive;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -24,22 +25,10 @@ import java.sql.SQLFeatureNotSupportedException;
 import java.sql.SQLTimeoutException;
 import java.sql.SQLWarning;
 import java.util.*;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.RowSetFactory;
-import org.apache.hive.service.rpc.thrift.TCLIService;
-import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
-import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
-import org.apache.hive.service.rpc.thrift.TCloseOperationReq;
-import org.apache.hive.service.rpc.thrift.TCloseOperationResp;
-import org.apache.hive.service.rpc.thrift.TExecuteStatementReq;
-import org.apache.hive.service.rpc.thrift.TExecuteStatementResp;
-import org.apache.hive.service.rpc.thrift.TFetchOrientation;
-import org.apache.hive.service.rpc.thrift.TFetchResultsReq;
-import org.apache.hive.service.rpc.thrift.TFetchResultsResp;
-import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
-import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
-import org.apache.hive.service.rpc.thrift.TOperationHandle;
-import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.apache.hive.service.rpc.thrift.*;
 import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream;
 import org.apache.kyuubi.jdbc.hive.logs.KyuubiLoggable;
 import org.apache.thrift.TException;
@@ -989,6 +978,30 @@ public class KyuubiStatement implements java.sql.Statement, KyuubiLoggable {
     return null;
   }
 
+  /**
+   * Returns the Query ID if it is running. This method is a public API for usage outside of Hive,
+   * although it is not part of the interface java.sql.Statement.
+   *
+   * @return Valid query ID if it is running else returns NULL.
+   * @throws SQLException If any internal failures.
+   */
+  @VisibleForTesting
+  public String getQueryId() throws SQLException {
+    if (stmtHandle == null) {
+      // If query is not running or already closed.
+      return null;
+    }
+
+    try {
+      final String queryId = client.GetQueryId(new TGetQueryIdReq(stmtHandle)).getQueryId();
+
+      // queryId can be empty string if query was already closed. Need to return null in such case.
+      return StringUtils.isBlank(queryId) ? null : queryId;
+    } catch (TException e) {
+      throw new SQLException(e);
+    }
+  }
+
   /**
    * This is only used by the beeline client to set the stream on which in place progress updates
    * are to be shown
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 4202bea5f..419a48249 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -310,6 +310,12 @@ class KyuubiSyncThriftClient private (
     resp.getOperationHandle
   }
 
+  def getQueryId(operationHandle: TOperationHandle): TGetQueryIdResp = {
+    val req = new TGetQueryIdReq(operationHandle)
+    val resp = withRetryingRequest(GetQueryId(req), "GetQueryId")
+    resp
+  }
+
   def getOperationStatus(operationHandle: TOperationHandle): TGetOperationStatusResp = {
     val req = new TGetOperationStatusReq(operationHandle)
     val resp = withRetryingRequest(GetOperationStatus(req), "GetOperationStatus")
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index e7a7d3e6c..69ebbb584 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -151,6 +151,18 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
     addOperation(operation)
   }
 
+  override def getQueryId(operation: Operation): String = {
+    val kyuubiOperation = operation.asInstanceOf[KyuubiOperation]
+    val client = kyuubiOperation.client
+    val remoteHandle = kyuubiOperation.remoteOpHandle()
+    if (remoteHandle != null) {
+      val queryId = client.getQueryId(remoteHandle).getQueryId
+      queryId
+    } else {
+      null
+    }
+  }
+
   def newLaunchEngineOperation(session: KyuubiSessionImpl, shouldRunAsync: Boolean): Operation = {
     val operation = new LaunchEngine(session, shouldRunAsync)
     addOperation(operation)