You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/26 10:12:55 UTC

[incubator-kyuubi] 01/02: [KYUUBI #3549] Support query id in Flink engine

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git

commit b792b96946ffce24587dc968fc51c04009042eaf
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Mon Sep 26 15:14:51 2022 +0800

    [KYUUBI #3549] Support query id in Flink engine
    
    Flink engine now doesn't support query id. This PR fixes the problem.
    
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3550 from link3280/KYUUBI-3549.
    
    Closes #3549
    
    4f583cbe [Paul Lin] Update externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
    b8808d73 [Paul Lin] [KYUUBI #3549] Simplify code
    a68bc59b [Paul Lin] [KYUUBI #3549] Fix typo in the comments
    e6217db2 [Paul Lin] [KYUUBI #3549] Support query id in Flink engine
    
    Authored-by: Paul Lin <pa...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../kyuubi/engine/flink/operation/ExecuteStatement.scala |  4 ++++
 .../flink/operation/FlinkSQLOperationManager.scala       |  9 ++++++++-
 .../engine/flink/operation/FlinkOperationSuite.scala     | 16 +++++++++++++++-
 3 files changed, 27 insertions(+), 2 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 47218276d..374f62f18 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -23,6 +23,7 @@ import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.flink.api.common.JobID
 import org.apache.flink.table.api.ResultKind
 import org.apache.flink.table.client.gateway.TypedResult
 import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
@@ -54,6 +55,8 @@ class ExecuteStatement(
   private val operationLog: OperationLog =
     OperationLog.createOperationLog(session, getHandle)
 
+  var jobId: Option[JobID] = None
+
   override def getOperationLog: Option[OperationLog] = Option(operationLog)
 
   override protected def beforeRun(): Unit = {
@@ -152,6 +155,7 @@ class ExecuteStatement(
 
   private def runOperation(operation: Operation): Unit = {
     val result = executor.executeOperation(sessionId, operation)
+    jobId = result.getJobClient.asScala.map(_.getJobID)
     result.await()
     resultSet = ResultSet.fromTableResult(result)
   }
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index 48a23b202..92f84b692 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -180,6 +180,13 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
   }
 
   override def getQueryId(operation: Operation): String = {
-    throw KyuubiSQLException.featureNotSupported()
+    // return empty string instead of null if there's no query id
+    // otherwise there would be TTransportException
+    operation match {
+      case exec: ExecuteStatement => exec.jobId.map(_.toHexString).getOrElse("")
+      case _: PlanOnlyStatement => ""
+      case _ =>
+        throw new IllegalStateException(s"Unsupported Flink operation class $classOf[operation].")
+    }
   }
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 75b22c93b..c08b7ffb8 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -19,10 +19,11 @@ package org.apache.kyuubi.engine.flink.operation
 
 import java.nio.file.Files
 import java.sql.DatabaseMetaData
-import java.util.UUID
+import java.util.{Properties, UUID}
 
 import scala.collection.JavaConverters._
 
+import org.apache.flink.api.common.JobID
 import org.apache.flink.table.types.logical.LogicalTypeRoot
 import org.apache.hive.service.rpc.thrift._
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -34,6 +35,7 @@ import org.apache.kyuubi.engine.flink.FlinkEngineUtils._
 import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
 import org.apache.kyuubi.engine.flink.result.Constants
 import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
+import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiStatement}
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 import org.apache.kyuubi.service.ServiceState._
@@ -974,4 +976,16 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
       assertDefaultDatabase(client, "default_database", true)
     }
   }
+
+  test("get query id") {
+    val conn = new KyuubiConnection(jdbcUrl, new Properties())
+    val stmt = conn.createStatement()
+    stmt.executeQuery("create table tbl_a (a int) with ('connector' = 'blackhole')")
+    assert(stmt.asInstanceOf[KyuubiStatement].getQueryId === null)
+    stmt.executeQuery("insert into tbl_a values (1)")
+    val queryId = stmt.asInstanceOf[KyuubiStatement].getQueryId
+    assert(queryId !== null)
+    // parse the string to check if it's valid Flink job id
+    assert(JobID.fromHexString(queryId) !== null)
+  }
 }