You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/05/03 13:51:32 UTC

[kyuubi] branch master updated: [KYUUBI #4495] Support Flink job management statements

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new dcb444e2d [KYUUBI #4495] Support Flink job management statements
dcb444e2d is described below

commit dcb444e2d8e44b30eb6cad74ae8b75b2e54b52da
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Wed May 3 21:51:21 2023 +0800

    [KYUUBI #4495] Support Flink job management statements
    
    ### _Why are the changes needed?_
    
    Support Flink job management statements.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4774 from link3280/KYUUBI-4495.
    
    Closes #4495
    
    a4aaebcbb [Paul Lin] [KYUUBI #4495] Adjust the order of tests
    225a6cdbd [Paul Lin] [KYUUBI #4495] Increase the number of taskmanagers in the mini cluster
    67935ac24 [Paul Lin] [KYUUBI #4495] Wait jobs to get ready for show job statements
    9c4ce1d6e [Paul Lin] [KYUUBI #4495] Fix show jobs assertion error
    ab3113cab [Paul Lin] [KYUUBI #4495] Support Flink job management statements
    
    Authored-by: Paul Lin <pa...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../engine/flink/WithFlinkSQLEngineLocal.scala     |  1 +
 .../engine/flink/WithFlinkTestResources.scala      |  5 +-
 .../flink/operation/FlinkOperationSuite.scala      | 70 +++++++++++++++++++++-
 3 files changed, 73 insertions(+), 3 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
index 0001f31ae..92c1bcd83 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
@@ -184,6 +184,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources
     val cfg = new MiniClusterConfiguration.Builder()
       .setConfiguration(flinkConfig)
       .setNumSlotsPerTaskManager(1)
+      .setNumTaskManagers(2)
       .build
     miniCluster = new MiniCluster(cfg)
     miniCluster.start()
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
index 3ea02774e..3b1d65cb2 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
@@ -41,6 +41,9 @@ trait WithFlinkTestResources {
     GENERATED_UDF_CLASS,
     GENERATED_UDF_CODE)
 
+  protected val savepointDir: File = Utils.createTempDir("savepoints").toFile
+
   protected val testExtraConf: Map[String, String] = Map(
-    "flink.pipeline.name" -> "test-job")
+    "flink.pipeline.name" -> "test-job",
+    "flink.state.savepoints.dir" -> savepointDir.toURI.toString)
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 00e26c528..39d17aa7b 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.kyuubi.engine.flink.operation
 
+import java.nio.file.Paths
 import java.sql.DatabaseMetaData
 import java.util.UUID
 
@@ -33,6 +34,7 @@ import org.apache.kyuubi.engine.flink.{FlinkEngineUtils, WithFlinkTestResources}
 import org.apache.kyuubi.engine.flink.result.Constants
 import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
 import org.apache.kyuubi.jdbc.hive.KyuubiStatement
+import org.apache.kyuubi.jdbc.hive.common.TimestampTZ
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 
@@ -632,6 +634,62 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
     }
   }
 
+  test("execute statement - show/stop jobs") {
+    if (FlinkEngineUtils.isFlinkVersionAtLeast("1.17")) {
+      withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "10"))(Map.empty) {
+        withMultipleConnectionJdbcStatement()({ statement =>
+          statement.executeQuery(
+            "create table tbl_a (a int) with (" +
+              "'connector' = 'datagen', " +
+              "'rows-per-second'='10')")
+          statement.executeQuery("create table tbl_b (a int) with ('connector' = 'blackhole')")
+          val insertResult1 = statement.executeQuery("insert into tbl_b select * from tbl_a")
+          assert(insertResult1.next())
+          val jobId1 = insertResult1.getString(1)
+
+          Thread.sleep(5000)
+
+          val showResult = statement.executeQuery("show jobs")
+          val metadata = showResult.getMetaData
+          assert(metadata.getColumnName(1) === "job id")
+          assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR)
+          assert(metadata.getColumnName(2) === "job name")
+          assert(metadata.getColumnType(2) === java.sql.Types.VARCHAR)
+          assert(metadata.getColumnName(3) === "status")
+          assert(metadata.getColumnType(3) === java.sql.Types.VARCHAR)
+          assert(metadata.getColumnName(4) === "start time")
+          assert(metadata.getColumnType(4) === java.sql.Types.OTHER)
+
+          var isFound = false
+          while (showResult.next()) {
+            if (showResult.getString(1) === jobId1) {
+              isFound = true
+              assert(showResult.getString(2) === "test-job")
+              assert(showResult.getString(3) === "RUNNING")
+              assert(showResult.getObject(4).isInstanceOf[TimestampTZ])
+            }
+          }
+          assert(isFound)
+
+          val stopResult1 = statement.executeQuery(s"stop job '$jobId1'")
+          assert(stopResult1.next())
+          assert(stopResult1.getString(1) === "OK")
+
+          val selectResult = statement.executeQuery("select * from tbl_a")
+          val jobId2 = statement.asInstanceOf[KyuubiStatement].getQueryId
+          assert(jobId2 !== null)
+          while (!selectResult.next()) {
+            Thread.sleep(1000L)
+          }
+          val stopResult2 = statement.executeQuery(s"stop job '$jobId2' with savepoint")
+          assert(stopResult2.getMetaData.getColumnName(1).equals("savepoint path"))
+          assert(stopResult2.next())
+          assert(Paths.get(stopResult2.getString(1)).getFileName.toString.startsWith("savepoint-"))
+        })
+      }
+    }
+  }
+
   test("execute statement - select column name with dots") {
     withJdbcStatement() { statement =>
       val resultSet = statement.executeQuery("select 'tmp.hello'")
@@ -994,7 +1052,14 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
       assert(metadata.getColumnName(1) === "job id")
       assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR)
       assert(resultSet.next())
-      assert(resultSet.getString(1).length == 32)
+      val jobId = resultSet.getString(1)
+      assert(jobId.length == 32)
+
+      if (FlinkEngineUtils.isFlinkVersionAtLeast("1.17")) {
+        val stopResult = statement.executeQuery(s"stop job '$jobId'")
+        assert(stopResult.next())
+        assert(stopResult.getString(1) === "OK")
+      }
     })
   }
 
@@ -1072,7 +1137,8 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
   test("ensure result max rows") {
     withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "200"))(Map.empty) {
       withJdbcStatement() { statement =>
-        statement.execute("create table tbl_src (a bigint) with ('connector' = 'datagen')")
+        statement.execute("create table tbl_src (a bigint) with (" +
+          "'connector' = 'datagen', 'number-of-rows' = '1000')")
         val resultSet = statement.executeQuery(s"select a from tbl_src")
         var rows = 0
         while (resultSet.next()) {