You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/26 10:12:55 UTC
[incubator-kyuubi] 01/02: [KYUUBI #3549] Support query id in Flink engine
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
commit b792b96946ffce24587dc968fc51c04009042eaf
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Mon Sep 26 15:14:51 2022 +0800
[KYUUBI #3549] Support query id in Flink engine
Flink engine now doesn't support query id. This PR fixes the problem.
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3550 from link3280/KYUUBI-3549.
Closes #3549
4f583cbe [Paul Lin] Update externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
b8808d73 [Paul Lin] [KYUUBI #3549] Simplify code
a68bc59b [Paul Lin] [KYUUBI #3549] Fix typo in the comments
e6217db2 [Paul Lin] [KYUUBI #3549] Support query id in Flink engine
Authored-by: Paul Lin <pa...@gmail.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../kyuubi/engine/flink/operation/ExecuteStatement.scala | 4 ++++
.../flink/operation/FlinkSQLOperationManager.scala | 9 ++++++++-
.../engine/flink/operation/FlinkOperationSuite.scala | 16 +++++++++++++++-
3 files changed, 27 insertions(+), 2 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 47218276d..374f62f18 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -23,6 +23,7 @@ import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
+import org.apache.flink.api.common.JobID
import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.client.gateway.TypedResult
import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
@@ -54,6 +55,8 @@ class ExecuteStatement(
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
+ var jobId: Option[JobID] = None
+
override def getOperationLog: Option[OperationLog] = Option(operationLog)
override protected def beforeRun(): Unit = {
@@ -152,6 +155,7 @@ class ExecuteStatement(
private def runOperation(operation: Operation): Unit = {
val result = executor.executeOperation(sessionId, operation)
+ jobId = result.getJobClient.asScala.map(_.getJobID)
result.await()
resultSet = ResultSet.fromTableResult(result)
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index 48a23b202..92f84b692 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -180,6 +180,13 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
}
override def getQueryId(operation: Operation): String = {
- throw KyuubiSQLException.featureNotSupported()
+ // return empty string instead of null if there's no query id
+ // otherwise there would be TTransportException
+ operation match {
+ case exec: ExecuteStatement => exec.jobId.map(_.toHexString).getOrElse("")
+ case _: PlanOnlyStatement => ""
+ case _ =>
+ throw new IllegalStateException(s"Unsupported Flink operation class $classOf[operation].")
+ }
}
}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 75b22c93b..c08b7ffb8 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -19,10 +19,11 @@ package org.apache.kyuubi.engine.flink.operation
import java.nio.file.Files
import java.sql.DatabaseMetaData
-import java.util.UUID
+import java.util.{Properties, UUID}
import scala.collection.JavaConverters._
+import org.apache.flink.api.common.JobID
import org.apache.flink.table.types.logical.LogicalTypeRoot
import org.apache.hive.service.rpc.thrift._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -34,6 +35,7 @@ import org.apache.kyuubi.engine.flink.FlinkEngineUtils._
import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
+import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiStatement}
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.service.ServiceState._
@@ -974,4 +976,16 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
assertDefaultDatabase(client, "default_database", true)
}
}
+
+ test("get query id") {
+ val conn = new KyuubiConnection(jdbcUrl, new Properties())
+ val stmt = conn.createStatement()
+ stmt.executeQuery("create table tbl_a (a int) with ('connector' = 'blackhole')")
+ assert(stmt.asInstanceOf[KyuubiStatement].getQueryId === null)
+ stmt.executeQuery("insert into tbl_a values (1)")
+ val queryId = stmt.asInstanceOf[KyuubiStatement].getQueryId
+ assert(queryId !== null)
+ // parse the string to check if it's valid Flink job id
+ assert(JobID.fromHexString(queryId) !== null)
+ }
}