You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2021/12/22 14:55:56 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1579] Implement basic ability of executing statement in Flink engine

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3673399  [KYUUBI #1579] Implement basic ability of executing statement in Flink engine
3673399 is described below

commit 367339932964102bf3111d2f37f4f89975b28ea6
Author: yanghua <ya...@gmail.com>
AuthorDate: Wed Dec 22 22:55:43 2021 +0800

    [KYUUBI #1579] Implement basic ability of executing statement in Flink engine
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #1603 from yanghua/KYUUBI-1579.
    
    Closes #1579
    
    48db76b3 [Cheng Pan] cleanup
    36707516 [Cheng Pan] Address comments
    25ca5ae2 [yanghua] reduce code
    6f18a4a0 [yanghua] [KYUUBI #1579] Implement basic ability of executing statement
    
    Lead-authored-by: yanghua <ya...@gmail.com>
    Co-authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 externals/kyuubi-flink-sql-engine/pom.xml          |   6 +
 .../engine/flink/operation/ExecuteStatement.scala  | 155 +++++++++++++++++++++
 .../engine/flink/operation/FlinkOperation.scala    |  15 +-
 .../flink/operation/FlinkSQLOperationManager.scala |   5 +-
 .../apache/kyuubi/engine/flink/schema/RowSet.scala |   6 +
 .../flink/session/FlinkSQLSessionManager.scala     |  19 ++-
 .../engine/flink/session/FlinkSessionImpl.scala    |   5 +
 .../flink/operation/FlinkOperationSuite.scala      |  61 +++++++-
 pom.xml                                            |  20 +++
 9 files changed, 285 insertions(+), 7 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/pom.xml b/externals/kyuubi-flink-sql-engine/pom.xml
index a4c7c00..fd6f8b1 100644
--- a/externals/kyuubi-flink-sql-engine/pom.xml
+++ b/externals/kyuubi-flink-sql-engine/pom.xml
@@ -139,6 +139,12 @@
       <artifactId>jul-to-slf4j</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
new file mode 100644
index 0000000..818d168
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.util
+import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.annotations.VisibleForTesting
+import org.apache.flink.table.client.gateway.{Executor, ResultDescriptor, TypedResult}
+import org.apache.flink.table.operations.QueryOperation
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
+import org.apache.kyuubi.engine.flink.result.{ColumnInfo, ResultKind, ResultSet}
+import org.apache.kyuubi.operation.{OperationState, OperationType}
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.ThreadUtils
+
+class ExecuteStatement(
+    session: Session,
+    override val statement: String,
+    override val shouldRunAsync: Boolean,
+    queryTimeout: Long)
+  extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) with Logging {
+
+  private val operationLog: OperationLog =
+    OperationLog.createOperationLog(session, getHandle)
+
+  private var resultDescriptor: ResultDescriptor = _
+
+  private var columnInfos: util.List[ColumnInfo] = _
+
+  private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
+
+  override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
+  @VisibleForTesting
+  override def setExecutor(executor: Executor): Unit = {
+    this.executor = executor
+  }
+
+  def setSessionId(sessionId: String): Unit = {
+    this.sessionId = sessionId
+  }
+
+  override protected def beforeRun(): Unit = {
+    OperationLog.setCurrentOperationLog(operationLog)
+    setState(OperationState.PENDING)
+    setHasResultSet(true)
+  }
+
+  override protected def afterRun(): Unit = {
+    OperationLog.removeCurrentOperationLog()
+  }
+
+  override protected def runInternal(): Unit = {
+    addTimeoutMonitor()
+    if (shouldRunAsync) {
+      val asyncOperation = new Runnable {
+        override def run(): Unit = {
+          OperationLog.setCurrentOperationLog(operationLog)
+        }
+      }
+
+      try {
+        executeStatement()
+        val flinkSQLSessionManager = session.sessionManager
+        val backgroundHandle = flinkSQLSessionManager.submitBackgroundOperation(asyncOperation)
+        setBackgroundHandle(backgroundHandle)
+      } catch {
+        case rejected: RejectedExecutionException =>
+          setState(OperationState.ERROR)
+          val ke =
+            KyuubiSQLException("Error submitting query in background, query rejected", rejected)
+          setOperationException(ke)
+          throw ke
+      }
+    } else {
+      executeStatement()
+    }
+  }
+
+  private def executeStatement(): Unit = {
+    try {
+      setState(OperationState.RUNNING)
+
+      columnInfos = new util.ArrayList[ColumnInfo]
+
+      val operation = executor.parseStatement(sessionId, statement)
+      resultDescriptor = executor.executeQuery(sessionId, operation.asInstanceOf[QueryOperation])
+      resultDescriptor.getResultSchema.getColumns.asScala.foreach { column =>
+        columnInfos.add(ColumnInfo.create(column.getName, column.getDataType.getLogicalType))
+      }
+
+      val resultID = resultDescriptor.getResultId
+
+      val rows = new ArrayBuffer[Row]()
+      var loop = true
+      while (loop) {
+        Thread.sleep(50) // slow the processing down
+
+        val result = executor.snapshotResult(sessionId, resultID, 2)
+        result.getType match {
+          case TypedResult.ResultType.PAYLOAD =>
+            rows.clear()
+            (1 to result.getPayload).foreach { page =>
+              rows ++= executor.retrieveResultPage(resultID, page).asScala
+            }
+          case TypedResult.ResultType.EOS => loop = false
+          case TypedResult.ResultType.EMPTY =>
+        }
+      }
+
+      resultSet = ResultSet.builder
+        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+        .columns(columnInfos)
+        .data(rows.toArray[Row])
+        .build
+      setState(OperationState.FINISHED)
+    } catch {
+      onError(cancel = true)
+    } finally {
+      statementTimeoutCleaner.foreach(_.shutdown())
+    }
+  }
+
+  private def addTimeoutMonitor(): Unit = {
+    if (queryTimeout > 0) {
+      val timeoutExecutor =
+        ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
+      val action: Runnable = () => cleanup(OperationState.TIMEOUT)
+      timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
+      statementTimeoutCleaner = Some(timeoutExecutor)
+    }
+  }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index 2c61ab4..f6780a2 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -21,6 +21,7 @@ import java.io.IOException
 
 import scala.collection.JavaConverters.collectionAsScalaIterableConverter
 
+import org.apache.flink.table.client.gateway.Executor
 import org.apache.flink.table.client.gateway.context.SessionContext
 import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
 
@@ -40,8 +41,20 @@ abstract class FlinkOperation(
     session: Session)
   extends AbstractOperation(opType, session) {
 
-  protected val sessionContext: SessionContext =
+  protected val sessionContext: SessionContext = {
     session.asInstanceOf[FlinkSessionImpl].getSessionContext
+  }
+
+  protected var executor: Executor = _
+
+  protected def setExecutor(executor: Executor): Unit = {
+    this.executor = session.asInstanceOf[FlinkSessionImpl].getExecutor
+  }
+
+  protected var sessionId: String = {
+    session.asInstanceOf[FlinkSessionImpl].getSessionId
+  }
+
   protected var resultSet: ResultSet = _
 
   override protected def beforeRun(): Unit = {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index df83949..390817a 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -28,7 +28,10 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
       session: Session,
       statement: String,
       runAsync: Boolean,
-      queryTimeout: Long): Operation = null
+      queryTimeout: Long): Operation = {
+    val op = new ExecuteStatement(session, statement, runAsync, queryTimeout)
+    addOperation(op)
+  }
 
   override def newGetTypeInfoOperation(session: Session): Operation = null
 
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
index 069ade7..3439805 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
@@ -113,6 +113,12 @@ object RowSet {
         tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
       }
       TColumnValue.stringVal(tStringValue)
+    } else if (logicalType.isInstanceOf[CharType]) {
+      val tStringValue = new TStringValue
+      if (row.getField(ordinal) != null) {
+        tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
+      }
+      TColumnValue.stringVal(tStringValue)
     } else {
       val tStrValue = new TStringValue
       if (row.getField(ordinal) != null) {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index 75a1b03..1ffc311 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -17,7 +17,9 @@
 
 package org.apache.kyuubi.engine.flink.session
 
+import org.apache.flink.table.client.gateway.Executor
 import org.apache.flink.table.client.gateway.context.DefaultContext
+import org.apache.flink.table.client.gateway.local.LocalExecutor
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
@@ -29,13 +31,24 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
   override protected def isServer: Boolean = false
 
   val operationManager = new FlinkSQLOperationManager()
+  val executor: Executor = new LocalExecutor(engineContext)
+
+  override def start(): Unit = {
+    super.start()
+    executor.start()
+  }
 
   override def openSession(
       protocol: TProtocolVersion,
       user: String,
       password: String,
       ipAddress: String,
-      conf: Map[String, String]): SessionHandle = null
-
-  override def closeSession(sessionHandle: SessionHandle): Unit = {}
+      conf: Map[String, String]): SessionHandle = {
+    executor.openSession("")
+    null
+  }
+
+  override def closeSession(sessionHandle: SessionHandle): Unit = {
+    executor.closeSession(sessionHandle.toString)
+  }
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index ccb3ec9..fe97cda 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -17,6 +17,7 @@
 
 package org.apache.kyuubi.engine.flink.session
 
+import org.apache.flink.table.client.gateway.Executor
 import org.apache.flink.table.client.gateway.context.SessionContext
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
@@ -36,4 +37,8 @@ class FlinkSessionImpl(
 
   def getSessionContext: SessionContext = sessionContext
 
+  def getExecutor: Executor = sessionManager.asInstanceOf[FlinkSQLSessionManager].executor
+
+  def getSessionId: String = handle.toString
+
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index ce538ec..f581043 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -17,11 +17,17 @@
 
 package org.apache.kyuubi.engine.flink.operation
 
+import java.net.URL
+import java.util
 import java.util.Collections
 
 import org.apache.flink.client.cli.DefaultCLI
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.configuration.{ConfigConstants, Configuration, MemorySize, TaskManagerOptions, WebOptions}
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
 import org.apache.flink.table.client.gateway.context.{DefaultContext, SessionContext}
+import org.apache.flink.table.client.gateway.local.LocalExecutor
+import org.apache.flink.test.util.MiniClusterWithClientResource
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.{KyuubiFunSuite, Utils}
@@ -34,6 +40,27 @@ class FlinkOperationSuite extends KyuubiFunSuite {
   val user: String = Utils.currentUser
   val password = "anonymous"
 
+  val NUM_TMS = 2
+  val NUM_SLOTS_PER_TM = 2
+
+  private def getConfig = {
+    val config = new Configuration
+    config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"))
+    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS)
+    config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM)
+    config.setBoolean(WebOptions.SUBMIT_ENABLE, false)
+    config
+  }
+
+  val MINI_CLUSTER_RESOURCE =
+    new MiniClusterWithClientResource(
+      new MiniClusterResourceConfiguration.Builder()
+        .setConfiguration(getConfig)
+        .setNumberTaskManagers(NUM_TMS)
+        .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM).build)
+
+  var clusterClient: ClusterClient[_] = _
+
   var engineContext = new DefaultContext(
     Collections.emptyList(),
     new Configuration,
@@ -41,7 +68,24 @@ class FlinkOperationSuite extends KyuubiFunSuite {
   var sessionContext: SessionContext = _
   var flinkSession: FlinkSessionImpl = _
 
+  private def createLocalExecutor: LocalExecutor =
+    createLocalExecutor(Collections.emptyList[URL], new Configuration)
+
+  private def createLocalExecutor(
+      dependencies: util.List[URL],
+      configuration: Configuration): LocalExecutor = {
+    configuration.addAll(clusterClient.getFlinkConfiguration)
+    val defaultContext: DefaultContext = new DefaultContext(
+      dependencies,
+      configuration,
+      Collections.singletonList(new DefaultCLI))
+    new LocalExecutor(defaultContext)
+  }
+
   override def beforeAll(): Unit = {
+    MINI_CLUSTER_RESOURCE.before()
+    clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient
+
     sessionContext = SessionContext.create(engineContext, "test-session-id");
     val flinkSQLSessionManager = new FlinkSQLSessionManager(engineContext)
     flinkSQLSessionManager.initialize(KyuubiConf())
@@ -66,7 +110,20 @@ class FlinkOperationSuite extends KyuubiFunSuite {
 
     val resultSet = getCatalogOperation.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
     assert(1 == resultSet.getRowsSize)
-    assert(resultSet.getRows.get(0).getColVals().get(0).getStringVal.getValue === "default_catalog")
+    assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "default_catalog")
+  }
+
+  test("execute statement -  select column name with dots") {
+    val executeStatementOp = new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1)
+    val executor = createLocalExecutor
+    executor.openSession("test-session")
+    executeStatementOp.setExecutor(executor)
+    executeStatementOp.setSessionId("test-session")
+    executeStatementOp.run()
+
+    val resultSet = executeStatementOp.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
+    assert(1 == resultSet.getRowsSize)
+    assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "tmp.hello")
   }
 
 }
diff --git a/pom.xml b/pom.xml
index 4540f23..ad8fd09 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1136,6 +1136,26 @@
                 <artifactId>flink-sql-client_${scala.binary.version}</artifactId>
                 <version>${flink.version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+                <version>${flink.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.logging.log4j</groupId>
+                        <artifactId>log4j-api</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.logging.log4j</groupId>
+                        <artifactId>log4j-core</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.logging.log4j</groupId>
+                        <artifactId>log4j-slf4j-impl</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
         </dependencies>
     </dependencyManagement>