You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/26 10:12:54 UTC

[incubator-kyuubi] branch branch-1.6 updated (2a9761694 -> 365469a8e)

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a change to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


    from 2a9761694 [KYUUBI #3544] Fix bin/kyuubi stop check
     new b792b9694 [KYUUBI #3549] Support query id in Flink engine
     new 365469a8e [KYUUBI #3549][FOLLOWUP] Simplify test

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kyuubi/engine/flink/operation/ExecuteStatement.scala   |  4 ++++
 .../engine/flink/operation/FlinkSQLOperationManager.scala  |  9 ++++++++-
 .../engine/flink/operation/FlinkOperationSuite.scala       | 14 ++++++++++++++
 3 files changed, 26 insertions(+), 1 deletion(-)


[incubator-kyuubi] 01/02: [KYUUBI #3549] Support query id in Flink engine

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git

commit b792b96946ffce24587dc968fc51c04009042eaf
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Mon Sep 26 15:14:51 2022 +0800

    [KYUUBI #3549] Support query id in Flink engine
    
    Flink engine now doesn't support query id. This PR fixes the problem.
    
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3550 from link3280/KYUUBI-3549.
    
    Closes #3549
    
    4f583cbe [Paul Lin] Update externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
    b8808d73 [Paul Lin] [KYUUBI #3549] Simplify code
    a68bc59b [Paul Lin] [KYUUBI #3549] Fix typo in the comments
    e6217db2 [Paul Lin] [KYUUBI #3549] Support query id in Flink engine
    
    Authored-by: Paul Lin <pa...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../kyuubi/engine/flink/operation/ExecuteStatement.scala |  4 ++++
 .../flink/operation/FlinkSQLOperationManager.scala       |  9 ++++++++-
 .../engine/flink/operation/FlinkOperationSuite.scala     | 16 +++++++++++++++-
 3 files changed, 27 insertions(+), 2 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 47218276d..374f62f18 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -23,6 +23,7 @@ import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.flink.api.common.JobID
 import org.apache.flink.table.api.ResultKind
 import org.apache.flink.table.client.gateway.TypedResult
 import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
@@ -54,6 +55,8 @@ class ExecuteStatement(
   private val operationLog: OperationLog =
     OperationLog.createOperationLog(session, getHandle)
 
+  var jobId: Option[JobID] = None
+
   override def getOperationLog: Option[OperationLog] = Option(operationLog)
 
   override protected def beforeRun(): Unit = {
@@ -152,6 +155,7 @@ class ExecuteStatement(
 
   private def runOperation(operation: Operation): Unit = {
     val result = executor.executeOperation(sessionId, operation)
+    jobId = result.getJobClient.asScala.map(_.getJobID)
     result.await()
     resultSet = ResultSet.fromTableResult(result)
   }
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index 48a23b202..92f84b692 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -180,6 +180,13 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
   }
 
   override def getQueryId(operation: Operation): String = {
-    throw KyuubiSQLException.featureNotSupported()
+    // return empty string instead of null if there's no query id
+    // otherwise there would be TTransportException
+    operation match {
+      case exec: ExecuteStatement => exec.jobId.map(_.toHexString).getOrElse("")
+      case _: PlanOnlyStatement => ""
+      case _ =>
+        throw new IllegalStateException(s"Unsupported Flink operation class $classOf[operation].")
+    }
   }
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 75b22c93b..c08b7ffb8 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -19,10 +19,11 @@ package org.apache.kyuubi.engine.flink.operation
 
 import java.nio.file.Files
 import java.sql.DatabaseMetaData
-import java.util.UUID
+import java.util.{Properties, UUID}
 
 import scala.collection.JavaConverters._
 
+import org.apache.flink.api.common.JobID
 import org.apache.flink.table.types.logical.LogicalTypeRoot
 import org.apache.hive.service.rpc.thrift._
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -34,6 +35,7 @@ import org.apache.kyuubi.engine.flink.FlinkEngineUtils._
 import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
 import org.apache.kyuubi.engine.flink.result.Constants
 import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
+import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiStatement}
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 import org.apache.kyuubi.service.ServiceState._
@@ -974,4 +976,16 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
       assertDefaultDatabase(client, "default_database", true)
     }
   }
+
+  test("get query id") {
+    val conn = new KyuubiConnection(jdbcUrl, new Properties())
+    val stmt = conn.createStatement()
+    stmt.executeQuery("create table tbl_a (a int) with ('connector' = 'blackhole')")
+    assert(stmt.asInstanceOf[KyuubiStatement].getQueryId === null)
+    stmt.executeQuery("insert into tbl_a values (1)")
+    val queryId = stmt.asInstanceOf[KyuubiStatement].getQueryId
+    assert(queryId !== null)
+    // parse the string to check if it's valid Flink job id
+    assert(JobID.fromHexString(queryId) !== null)
+  }
 }


[incubator-kyuubi] 02/02: [KYUUBI #3549][FOLLOWUP] Simplify test

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git

commit 365469a8e1834f2654b0e2bdfff78fc2018a7a8b
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Mon Sep 26 18:06:06 2022 +0800

    [KYUUBI #3549][FOLLOWUP] Simplify test
    
    Simplify test by using helper method `withJdbcStatement`
    
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3556 from pan3793/followup.
    
    Closes #3549
    
    75103f85 [Cheng Pan] nit
    449a2563 [Cheng Pan] [KYUUBI #3549][FOLLOWUP] Simplify test
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../flink/operation/FlinkOperationSuite.scala      | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index c08b7ffb8..e8b3eefd9 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.flink.operation
 
 import java.nio.file.Files
 import java.sql.DatabaseMetaData
-import java.util.{Properties, UUID}
+import java.util.UUID
 
 import scala.collection.JavaConverters._
 
@@ -35,7 +35,7 @@ import org.apache.kyuubi.engine.flink.FlinkEngineUtils._
 import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
 import org.apache.kyuubi.engine.flink.result.Constants
 import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
-import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiStatement}
+import org.apache.kyuubi.jdbc.hive.KyuubiStatement
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 import org.apache.kyuubi.service.ServiceState._
@@ -978,14 +978,14 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
   }
 
   test("get query id") {
-    val conn = new KyuubiConnection(jdbcUrl, new Properties())
-    val stmt = conn.createStatement()
-    stmt.executeQuery("create table tbl_a (a int) with ('connector' = 'blackhole')")
-    assert(stmt.asInstanceOf[KyuubiStatement].getQueryId === null)
-    stmt.executeQuery("insert into tbl_a values (1)")
-    val queryId = stmt.asInstanceOf[KyuubiStatement].getQueryId
-    assert(queryId !== null)
-    // parse the string to check if it's valid Flink job id
-    assert(JobID.fromHexString(queryId) !== null)
+    withJdbcStatement("tbl_a") { stmt =>
+      stmt.executeQuery("create table tbl_a (a int) with ('connector' = 'blackhole')")
+      assert(stmt.asInstanceOf[KyuubiStatement].getQueryId === null)
+      stmt.executeQuery("insert into tbl_a values (1)")
+      val queryId = stmt.asInstanceOf[KyuubiStatement].getQueryId
+      assert(queryId !== null)
+      // parse the string to check if it's valid Flink job id
+      assert(JobID.fromHexString(queryId) !== null)
+    }
   }
 }