You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2021/12/22 14:55:56 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #1579] Implement basic ability of executing statement in Flink engine
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 3673399 [KYUUBI #1579] Implement basic ability of executing statement in Flink engine
3673399 is described below
commit 367339932964102bf3111d2f37f4f89975b28ea6
Author: yanghua <ya...@gmail.com>
AuthorDate: Wed Dec 22 22:55:43 2021 +0800
[KYUUBI #1579] Implement basic ability of executing statement in Flink engine
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #1603 from yanghua/KYUUBI-1579.
Closes #1579
48db76b3 [Cheng Pan] cleanup
36707516 [Cheng Pan] Address comments
25ca5ae2 [yanghua] reduce code
6f18a4a0 [yanghua] [KYUUBI #1579] Implement basic ability of executing statement
Lead-authored-by: yanghua <ya...@gmail.com>
Co-authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
externals/kyuubi-flink-sql-engine/pom.xml | 6 +
.../engine/flink/operation/ExecuteStatement.scala | 155 +++++++++++++++++++++
.../engine/flink/operation/FlinkOperation.scala | 15 +-
.../flink/operation/FlinkSQLOperationManager.scala | 5 +-
.../apache/kyuubi/engine/flink/schema/RowSet.scala | 6 +
.../flink/session/FlinkSQLSessionManager.scala | 19 ++-
.../engine/flink/session/FlinkSessionImpl.scala | 5 +
.../flink/operation/FlinkOperationSuite.scala | 61 +++++++-
pom.xml | 20 +++
9 files changed, 285 insertions(+), 7 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/pom.xml b/externals/kyuubi-flink-sql-engine/pom.xml
index a4c7c00..fd6f8b1 100644
--- a/externals/kyuubi-flink-sql-engine/pom.xml
+++ b/externals/kyuubi-flink-sql-engine/pom.xml
@@ -139,6 +139,12 @@
<artifactId>jul-to-slf4j</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
new file mode 100644
index 0000000..818d168
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.util
+import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.annotations.VisibleForTesting
+import org.apache.flink.table.client.gateway.{Executor, ResultDescriptor, TypedResult}
+import org.apache.flink.table.operations.QueryOperation
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
+import org.apache.kyuubi.engine.flink.result.{ColumnInfo, ResultKind, ResultSet}
+import org.apache.kyuubi.operation.{OperationState, OperationType}
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.ThreadUtils
+
+class ExecuteStatement(
+ session: Session,
+ override val statement: String,
+ override val shouldRunAsync: Boolean,
+ queryTimeout: Long)
+ extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) with Logging {
+
+ private val operationLog: OperationLog =
+ OperationLog.createOperationLog(session, getHandle)
+
+ private var resultDescriptor: ResultDescriptor = _
+
+ private var columnInfos: util.List[ColumnInfo] = _
+
+ private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
+
+ override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
+ @VisibleForTesting
+ override def setExecutor(executor: Executor): Unit = {
+ this.executor = executor
+ }
+
+ def setSessionId(sessionId: String): Unit = {
+ this.sessionId = sessionId
+ }
+
+ override protected def beforeRun(): Unit = {
+ OperationLog.setCurrentOperationLog(operationLog)
+ setState(OperationState.PENDING)
+ setHasResultSet(true)
+ }
+
+ override protected def afterRun(): Unit = {
+ OperationLog.removeCurrentOperationLog()
+ }
+
+ override protected def runInternal(): Unit = {
+ addTimeoutMonitor()
+ if (shouldRunAsync) {
+ val asyncOperation = new Runnable {
+ override def run(): Unit = {
+ OperationLog.setCurrentOperationLog(operationLog)
+ }
+ }
+
+ try {
+ executeStatement()
+ val flinkSQLSessionManager = session.sessionManager
+ val backgroundHandle = flinkSQLSessionManager.submitBackgroundOperation(asyncOperation)
+ setBackgroundHandle(backgroundHandle)
+ } catch {
+ case rejected: RejectedExecutionException =>
+ setState(OperationState.ERROR)
+ val ke =
+ KyuubiSQLException("Error submitting query in background, query rejected", rejected)
+ setOperationException(ke)
+ throw ke
+ }
+ } else {
+ executeStatement()
+ }
+ }
+
+ private def executeStatement(): Unit = {
+ try {
+ setState(OperationState.RUNNING)
+
+ columnInfos = new util.ArrayList[ColumnInfo]
+
+ val operation = executor.parseStatement(sessionId, statement)
+ resultDescriptor = executor.executeQuery(sessionId, operation.asInstanceOf[QueryOperation])
+ resultDescriptor.getResultSchema.getColumns.asScala.foreach { column =>
+ columnInfos.add(ColumnInfo.create(column.getName, column.getDataType.getLogicalType))
+ }
+
+ val resultID = resultDescriptor.getResultId
+
+ val rows = new ArrayBuffer[Row]()
+ var loop = true
+ while (loop) {
+ Thread.sleep(50) // slow the processing down
+
+ val result = executor.snapshotResult(sessionId, resultID, 2)
+ result.getType match {
+ case TypedResult.ResultType.PAYLOAD =>
+ rows.clear()
+ (1 to result.getPayload).foreach { page =>
+ rows ++= executor.retrieveResultPage(resultID, page).asScala
+ }
+ case TypedResult.ResultType.EOS => loop = false
+ case TypedResult.ResultType.EMPTY =>
+ }
+ }
+
+ resultSet = ResultSet.builder
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .columns(columnInfos)
+ .data(rows.toArray[Row])
+ .build
+ setState(OperationState.FINISHED)
+ } catch {
+ onError(cancel = true)
+ } finally {
+ statementTimeoutCleaner.foreach(_.shutdown())
+ }
+ }
+
+ private def addTimeoutMonitor(): Unit = {
+ if (queryTimeout > 0) {
+ val timeoutExecutor =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
+ val action: Runnable = () => cleanup(OperationState.TIMEOUT)
+ timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
+ statementTimeoutCleaner = Some(timeoutExecutor)
+ }
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index 2c61ab4..f6780a2 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -21,6 +21,7 @@ import java.io.IOException
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
@@ -40,8 +41,20 @@ abstract class FlinkOperation(
session: Session)
extends AbstractOperation(opType, session) {
- protected val sessionContext: SessionContext =
+ protected val sessionContext: SessionContext = {
session.asInstanceOf[FlinkSessionImpl].getSessionContext
+ }
+
+ protected var executor: Executor = _
+
+ protected def setExecutor(executor: Executor): Unit = {
+ this.executor = session.asInstanceOf[FlinkSessionImpl].getExecutor
+ }
+
+ protected var sessionId: String = {
+ session.asInstanceOf[FlinkSessionImpl].getSessionId
+ }
+
protected var resultSet: ResultSet = _
override protected def beforeRun(): Unit = {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index df83949..390817a 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -28,7 +28,10 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
session: Session,
statement: String,
runAsync: Boolean,
- queryTimeout: Long): Operation = null
+ queryTimeout: Long): Operation = {
+ val op = new ExecuteStatement(session, statement, runAsync, queryTimeout)
+ addOperation(op)
+ }
override def newGetTypeInfoOperation(session: Session): Operation = null
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
index 069ade7..3439805 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
@@ -113,6 +113,12 @@ object RowSet {
tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
}
TColumnValue.stringVal(tStringValue)
+ } else if (logicalType.isInstanceOf[CharType]) {
+ val tStringValue = new TStringValue
+ if (row.getField(ordinal) != null) {
+ tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
+ }
+ TColumnValue.stringVal(tStringValue)
} else {
val tStrValue = new TStringValue
if (row.getField(ordinal) != null) {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index 75a1b03..1ffc311 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -17,7 +17,9 @@
package org.apache.kyuubi.engine.flink.session
+import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.DefaultContext
+import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
@@ -29,13 +31,24 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
override protected def isServer: Boolean = false
val operationManager = new FlinkSQLOperationManager()
+ val executor: Executor = new LocalExecutor(engineContext)
+
+ override def start(): Unit = {
+ super.start()
+ executor.start()
+ }
override def openSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
- conf: Map[String, String]): SessionHandle = null
-
- override def closeSession(sessionHandle: SessionHandle): Unit = {}
+ conf: Map[String, String]): SessionHandle = {
+ executor.openSession("")
+ null
+ }
+
+ override def closeSession(sessionHandle: SessionHandle): Unit = {
+ executor.closeSession(sessionHandle.toString)
+ }
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index ccb3ec9..fe97cda 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.engine.flink.session
+import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.hive.service.rpc.thrift.TProtocolVersion
@@ -36,4 +37,8 @@ class FlinkSessionImpl(
def getSessionContext: SessionContext = sessionContext
+ def getExecutor: Executor = sessionManager.asInstanceOf[FlinkSQLSessionManager].executor
+
+ def getSessionId: String = handle.toString
+
}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index ce538ec..f581043 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -17,11 +17,17 @@
package org.apache.kyuubi.engine.flink.operation
+import java.net.URL
+import java.util
import java.util.Collections
import org.apache.flink.client.cli.DefaultCLI
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.configuration.{ConfigConstants, Configuration, MemorySize, TaskManagerOptions, WebOptions}
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.table.client.gateway.context.{DefaultContext, SessionContext}
+import org.apache.flink.table.client.gateway.local.LocalExecutor
+import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
@@ -34,6 +40,27 @@ class FlinkOperationSuite extends KyuubiFunSuite {
val user: String = Utils.currentUser
val password = "anonymous"
+ val NUM_TMS = 2
+ val NUM_SLOTS_PER_TM = 2
+
+ private def getConfig = {
+ val config = new Configuration
+ config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"))
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM)
+ config.setBoolean(WebOptions.SUBMIT_ENABLE, false)
+ config
+ }
+
+ val MINI_CLUSTER_RESOURCE =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfig)
+ .setNumberTaskManagers(NUM_TMS)
+ .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM).build)
+
+ var clusterClient: ClusterClient[_] = _
+
var engineContext = new DefaultContext(
Collections.emptyList(),
new Configuration,
@@ -41,7 +68,24 @@ class FlinkOperationSuite extends KyuubiFunSuite {
var sessionContext: SessionContext = _
var flinkSession: FlinkSessionImpl = _
+ private def createLocalExecutor: LocalExecutor =
+ createLocalExecutor(Collections.emptyList[URL], new Configuration)
+
+ private def createLocalExecutor(
+ dependencies: util.List[URL],
+ configuration: Configuration): LocalExecutor = {
+ configuration.addAll(clusterClient.getFlinkConfiguration)
+ val defaultContext: DefaultContext = new DefaultContext(
+ dependencies,
+ configuration,
+ Collections.singletonList(new DefaultCLI))
+ new LocalExecutor(defaultContext)
+ }
+
override def beforeAll(): Unit = {
+ MINI_CLUSTER_RESOURCE.before()
+ clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient
+
sessionContext = SessionContext.create(engineContext, "test-session-id");
val flinkSQLSessionManager = new FlinkSQLSessionManager(engineContext)
flinkSQLSessionManager.initialize(KyuubiConf())
@@ -66,7 +110,20 @@ class FlinkOperationSuite extends KyuubiFunSuite {
val resultSet = getCatalogOperation.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
assert(1 == resultSet.getRowsSize)
- assert(resultSet.getRows.get(0).getColVals().get(0).getStringVal.getValue === "default_catalog")
+ assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "default_catalog")
+ }
+
+ test("execute statement - select column name with dots") {
+ val executeStatementOp = new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1)
+ val executor = createLocalExecutor
+ executor.openSession("test-session")
+ executeStatementOp.setExecutor(executor)
+ executeStatementOp.setSessionId("test-session")
+ executeStatementOp.run()
+
+ val resultSet = executeStatementOp.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
+ assert(1 == resultSet.getRowsSize)
+ assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "tmp.hello")
}
}
diff --git a/pom.xml b/pom.xml
index 4540f23..ad8fd09 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1136,6 +1136,26 @@
<artifactId>flink-sql-client_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</dependencyManagement>