You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/26 10:12:54 UTC
[incubator-kyuubi] branch branch-1.6 updated (2a9761694 -> 365469a8e)
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a change to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
from 2a9761694 [KYUUBI #3544] Fix bin/kyuubi stop check
new b792b9694 [KYUUBI #3549] Support query id in Flink engine
new 365469a8e [KYUUBI #3549][FOLLOWUP] Simplify test
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../kyuubi/engine/flink/operation/ExecuteStatement.scala | 4 ++++
.../engine/flink/operation/FlinkSQLOperationManager.scala | 9 ++++++++-
.../engine/flink/operation/FlinkOperationSuite.scala | 14 ++++++++++++++
3 files changed, 26 insertions(+), 1 deletion(-)
[incubator-kyuubi] 01/02: [KYUUBI #3549] Support query id in Flink engine
Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
commit b792b96946ffce24587dc968fc51c04009042eaf
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Mon Sep 26 15:14:51 2022 +0800
[KYUUBI #3549] Support query id in Flink engine
Flink engine now doesn't support query id. This PR fixes the problem.
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3550 from link3280/KYUUBI-3549.
Closes #3549
4f583cbe [Paul Lin] Update externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
b8808d73 [Paul Lin] [KYUUBI #3549] Simplify code
a68bc59b [Paul Lin] [KYUUBI #3549] Fix typo in the comments
e6217db2 [Paul Lin] [KYUUBI #3549] Support query id in Flink engine
Authored-by: Paul Lin <pa...@gmail.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../kyuubi/engine/flink/operation/ExecuteStatement.scala | 4 ++++
.../flink/operation/FlinkSQLOperationManager.scala | 9 ++++++++-
.../engine/flink/operation/FlinkOperationSuite.scala | 16 +++++++++++++++-
3 files changed, 27 insertions(+), 2 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 47218276d..374f62f18 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -23,6 +23,7 @@ import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
+import org.apache.flink.api.common.JobID
import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.client.gateway.TypedResult
import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
@@ -54,6 +55,8 @@ class ExecuteStatement(
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
+ var jobId: Option[JobID] = None
+
override def getOperationLog: Option[OperationLog] = Option(operationLog)
override protected def beforeRun(): Unit = {
@@ -152,6 +155,7 @@ class ExecuteStatement(
private def runOperation(operation: Operation): Unit = {
val result = executor.executeOperation(sessionId, operation)
+ jobId = result.getJobClient.asScala.map(_.getJobID)
result.await()
resultSet = ResultSet.fromTableResult(result)
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index 48a23b202..92f84b692 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -180,6 +180,13 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
}
override def getQueryId(operation: Operation): String = {
- throw KyuubiSQLException.featureNotSupported()
+ // return empty string instead of null if there's no query id
+ // otherwise there would be TTransportException
+ operation match {
+ case exec: ExecuteStatement => exec.jobId.map(_.toHexString).getOrElse("")
+ case _: PlanOnlyStatement => ""
+ case _ =>
+ throw new IllegalStateException(s"Unsupported Flink operation class $classOf[operation].")
+ }
}
}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 75b22c93b..c08b7ffb8 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -19,10 +19,11 @@ package org.apache.kyuubi.engine.flink.operation
import java.nio.file.Files
import java.sql.DatabaseMetaData
-import java.util.UUID
+import java.util.{Properties, UUID}
import scala.collection.JavaConverters._
+import org.apache.flink.api.common.JobID
import org.apache.flink.table.types.logical.LogicalTypeRoot
import org.apache.hive.service.rpc.thrift._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -34,6 +35,7 @@ import org.apache.kyuubi.engine.flink.FlinkEngineUtils._
import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
+import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiStatement}
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.service.ServiceState._
@@ -974,4 +976,16 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
assertDefaultDatabase(client, "default_database", true)
}
}
+
+ test("get query id") {
+ val conn = new KyuubiConnection(jdbcUrl, new Properties())
+ val stmt = conn.createStatement()
+ stmt.executeQuery("create table tbl_a (a int) with ('connector' = 'blackhole')")
+ assert(stmt.asInstanceOf[KyuubiStatement].getQueryId === null)
+ stmt.executeQuery("insert into tbl_a values (1)")
+ val queryId = stmt.asInstanceOf[KyuubiStatement].getQueryId
+ assert(queryId !== null)
+ // parse the string to check if it's valid Flink job id
+ assert(JobID.fromHexString(queryId) !== null)
+ }
}
[incubator-kyuubi] 02/02: [KYUUBI #3549][FOLLOWUP] Simplify test
Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
commit 365469a8e1834f2654b0e2bdfff78fc2018a7a8b
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Mon Sep 26 18:06:06 2022 +0800
[KYUUBI #3549][FOLLOWUP] Simplify test
Simplify test by using helper method `withJdbcStatement`
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3556 from pan3793/followup.
Closes #3549
75103f85 [Cheng Pan] nit
449a2563 [Cheng Pan] [KYUUBI #3549][FOLLOWUP] Simplify test
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../flink/operation/FlinkOperationSuite.scala | 22 +++++++++++-----------
1 file changed, 11 insertions(+), 11 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index c08b7ffb8..e8b3eefd9 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.flink.operation
import java.nio.file.Files
import java.sql.DatabaseMetaData
-import java.util.{Properties, UUID}
+import java.util.UUID
import scala.collection.JavaConverters._
@@ -35,7 +35,7 @@ import org.apache.kyuubi.engine.flink.FlinkEngineUtils._
import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
-import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiStatement}
+import org.apache.kyuubi.jdbc.hive.KyuubiStatement
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.service.ServiceState._
@@ -978,14 +978,14 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
}
test("get query id") {
- val conn = new KyuubiConnection(jdbcUrl, new Properties())
- val stmt = conn.createStatement()
- stmt.executeQuery("create table tbl_a (a int) with ('connector' = 'blackhole')")
- assert(stmt.asInstanceOf[KyuubiStatement].getQueryId === null)
- stmt.executeQuery("insert into tbl_a values (1)")
- val queryId = stmt.asInstanceOf[KyuubiStatement].getQueryId
- assert(queryId !== null)
- // parse the string to check if it's valid Flink job id
- assert(JobID.fromHexString(queryId) !== null)
+ withJdbcStatement("tbl_a") { stmt =>
+ stmt.executeQuery("create table tbl_a (a int) with ('connector' = 'blackhole')")
+ assert(stmt.asInstanceOf[KyuubiStatement].getQueryId === null)
+ stmt.executeQuery("insert into tbl_a values (1)")
+ val queryId = stmt.asInstanceOf[KyuubiStatement].getQueryId
+ assert(queryId !== null)
+ // parse the string to check if it's valid Flink job id
+ assert(JobID.fromHexString(queryId) !== null)
+ }
}
}