You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2021/10/12 06:46:13 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #1198] [FEATURE]
Support incremental collection
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new b1b7f25 [KYUUBI #1198] [FEATURE] Support incremental collection
b1b7f25 is described below
commit b1b7f25faf454518b8eaee0b874666b4aa065b93
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Tue Oct 12 14:46:05 2021 +0800
[KYUUBI #1198] [FEATURE] Support incremental collection
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Support incremental collection, [SPARK-25224](https://issues.apache.org/jira/browse/SPARK-25224)
Introduce new conf: `kyuubi.operation.incremental.collect`
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #1198 from pan3793/inc-col.
Closes #1198
946068e9 [Cheng Pan] Address comments
2798d0d8 [Cheng Pan] Correct conf doc
3720fd41 [Cheng Pan] Incremental collection
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../engine/spark/operation/ExecuteStatement.scala | 18 +++--
.../spark/operation/SparkSQLOperationManager.scala | 11 ++-
.../engine/spark/KyuubiStatementMonitorSuite.scala | 10 ---
.../spark/operation/SparkOperationSuite.scala | 10 ---
.../spark/kyuubi/SQLOperationListenerSuite.scala | 12 ++--
.../org/apache/kyuubi/config/KyuubiConf.scala | 9 +++
.../apache/kyuubi/operation/JDBCTestUtils.scala | 14 +++-
.../operation/KyuubiIncrementCollectSuite.scala | 82 ++++++++++++++++++++++
8 files changed, 132 insertions(+), 34 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 796d87b..2e2eb31 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -19,13 +19,16 @@ package org.apache.kyuubi.engine.spark.operation
import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
+import scala.collection.JavaConverters._
+
import org.apache.spark.kyuubi.SQLOperationListener
+import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, KyuubiSparkUtil}
+import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, IterableFetchIterator, KyuubiSparkUtil}
import org.apache.kyuubi.engine.spark.events.{EventLoggingService, SparkStatementEvent}
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.OperationState.OperationState
@@ -38,7 +41,8 @@ class ExecuteStatement(
session: Session,
protected override val statement: String,
override val shouldRunAsync: Boolean,
- queryTimeout: Long)
+ queryTimeout: Long,
+ incrementalCollect: Boolean)
extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) with Logging {
import org.apache.kyuubi.KyuubiSparkUtils._
@@ -88,11 +92,17 @@ class ExecuteStatement(
// TODO: Make it configurable
spark.sparkContext.addSparkListener(operationListener)
result = spark.sql(statement)
- // TODO( #921): COMPILED need consider eagerly executed commands
+ // TODO #921: COMPILED need consider eagerly executed commands
statementEvent.queryExecution = result.queryExecution.toString()
setState(OperationState.COMPILED)
debug(result.queryExecution)
- iter = new ArrayFetchIterator(result.collect())
+ iter = if (incrementalCollect) {
+ info("Execute in incremental collect mode")
+ new IterableFetchIterator[Row](result.toLocalIterator().asScala.toIterable)
+ } else {
+ info("Execute in full collect mode")
+ new ArrayFetchIterator(result.collect())
+ }
setState(OperationState.FINISHED)
} catch {
onError(cancel = true)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index b34510b..ccda453 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.config.KyuubiConf.{OPERATION_PLAN_ONLY, OperationModes}
+import org.apache.kyuubi.config.KyuubiConf.{OPERATION_INCREMENTAL_COLLECT, OPERATION_PLAN_ONLY, OperationModes}
import org.apache.kyuubi.config.KyuubiConf.OperationModes._
import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
import org.apache.kyuubi.operation.{Operation, OperationManager}
@@ -56,6 +56,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
def getOpenSparkSessionCount: Int = sessionToSpark.size()
private lazy val operationModeDefault = getConf.get(OPERATION_PLAN_ONLY)
+ private lazy val operationIncrementalCollectDefault = getConf.get(OPERATION_INCREMENTAL_COLLECT)
override def newExecuteStatementOperation(
session: Session,
@@ -66,9 +67,13 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
val operationModeStr =
spark.conf.get(OPERATION_PLAN_ONLY.key, operationModeDefault).toUpperCase(Locale.ROOT)
+ val incrementalCollect = spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key)
+ .map(_.toBoolean).getOrElse(operationIncrementalCollectDefault)
val operation = OperationModes.withName(operationModeStr) match {
- case NONE => new ExecuteStatement(spark, session, statement, runAsync, queryTimeout)
- case mode => new PlanOnlyStatement(spark, session, statement, mode)
+ case NONE =>
+ new ExecuteStatement(spark, session, statement, runAsync, queryTimeout, incrementalCollect)
+ case mode =>
+ new PlanOnlyStatement(spark, session, statement, mode)
}
addOperation(operation)
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala
index ea6a97c..4058557 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala
@@ -20,8 +20,6 @@ package org.apache.kyuubi.engine.spark
import java.util.concurrent.ConcurrentHashMap
import org.apache.hive.service.rpc.thrift._
-import org.apache.hive.service.rpc.thrift.TCLIService.Iface
-import org.apache.hive.service.rpc.thrift.TOperationState._
import org.apache.spark.scheduler.JobSucceeded
import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._
@@ -75,12 +73,4 @@ class KyuubiStatementMonitorSuite extends WithSparkSQLEngine with HiveJDBCTests
assert(jobIdToJobInfoMap.size() === 1)
}
}
-
- private def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = {
- val req = new TGetOperationStatusReq(op)
- var state = client.GetOperationStatus(req).getOperationState
- while (state == INITIALIZED_STATE || state == PENDING_STATE || state == RUNNING_STATE) {
- state = client.GetOperationStatus(req).getOperationState
- }
- }
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
index 8083c36..0e9e03d 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
@@ -30,8 +30,6 @@ import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hive.common.util.HiveVersionInfo
import org.apache.hive.service.cli.HiveSQLException
import org.apache.hive.service.rpc.thrift._
-import org.apache.hive.service.rpc.thrift.TCLIService.Iface
-import org.apache.hive.service.rpc.thrift.TOperationState._
import org.apache.spark.kyuubi.SparkContextHelper
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.types._
@@ -495,14 +493,6 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveJDBCTests {
}
}
- private def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = {
- val req = new TGetOperationStatusReq(op)
- var state = client.GetOperationStatus(req).getOperationState
- while (state == INITIALIZED_STATE || state == PENDING_STATE || state == RUNNING_STATE) {
- state = client.GetOperationStatus(req).getOperationState
- }
-
- }
test("basic open | execute | close") {
withThriftClient { client =>
val operationManager = engine.backendService.sessionManager.
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala
index 2cc57d9..40c9747 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala
@@ -43,12 +43,12 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with JDBCTestUtils {
fetchResultsReq.setFetchType(1.toShort)
eventually(timeout(90.seconds), interval(500.milliseconds)) {
val resultsResp = client.FetchResults(fetchResultsReq)
- val toSeq = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala.toSeq
- assert(toSeq.exists(_.contains("started with 2 stages")))
- assert(toSeq.exists(_.contains("started with 1 tasks")))
- assert(toSeq.exists(_.contains("started with 3 tasks")))
- assert(toSeq.exists(_.contains("Finished stage:")))
- assert(toSeq.exists(_.contains("Job 0 succeeded")))
+ val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
+ assert(logs.exists(_.contains("started with 2 stages")))
+ assert(logs.exists(_.contains("started with 1 tasks")))
+ assert(logs.exists(_.contains("started with 3 tasks")))
+ assert(logs.exists(_.contains("Finished stage:")))
+ assert(logs.exists(_.contains("Job 0 succeeded")))
}
}
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 29d53c6..3447a18 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -687,6 +687,15 @@ object KyuubiConf {
.checkValue(_ >= 1000, "must >= 1s if set")
.createOptional
+ val OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
+ buildConf("operation.incremental.collect")
+ .internal
+ .doc("When true, the executor side result will be sequentially calculated and returned to" +
+ " the Spark driver side.")
+ .version("1.4.0")
+ .booleanConf
+ .createWithDefault(false)
+
val SERVER_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] =
buildConf("operation.log.dir.root")
.doc("Root directory for query operation log at server-side.")
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala
index 1decd7b..4df5930 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala
@@ -20,9 +20,12 @@ package org.apache.kyuubi.operation
import java.sql.{DriverManager, ResultSet, SQLException, Statement}
import java.util.Locale
-import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TSessionHandle}
+import org.apache.hive.service.rpc.thrift._
+import org.apache.hive.service.rpc.thrift.TCLIService.Iface
+import org.apache.hive.service.rpc.thrift.TOperationState._
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TSocket
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.service.authentication.PlainSASLHelper
@@ -169,4 +172,13 @@ trait JDBCTestUtils extends KyuubiFunSuite {
assert(!rs.next())
assert(dbNames.size === count, "All expected schemas should be visited")
}
+
+ def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = {
+ val req = new TGetOperationStatusReq(op)
+ var state = client.GetOperationStatus(req).getOperationState
+ eventually(timeout(90.seconds), interval(100.milliseconds)) {
+ state = client.GetOperationStatus(req).getOperationState
+ assert(!Set(INITIALIZED_STATE, PENDING_STATE, RUNNING_STATE).contains(state))
+ }
+ }
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiIncrementCollectSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiIncrementCollectSuite.scala
new file mode 100644
index 0000000..b7aa4f5
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiIncrementCollectSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hive.service.rpc.thrift._
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.WithKyuubiServer
+import org.apache.kyuubi.config.KyuubiConf
+
+class KyuubiIncrementCollectSuite extends WithKyuubiServer with JDBCTestUtils {
+
+ override protected val conf: KyuubiConf = KyuubiConf()
+ .set(KyuubiConf.OPERATION_INCREMENTAL_COLLECT, true)
+
+ override protected def jdbcUrl: String = getJdbcUrl
+
+ test("change incremental collect mode using SET commands") {
+ val querySQL = "SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1"
+ withSessionHandle { (client, handle) =>
+ def execute(sql: String): TOperationHandle = {
+ val req = new TExecuteStatementReq()
+ req.setSessionHandle(handle)
+ req.setStatement(sql)
+ val execStmtResp = client.ExecuteStatement(req)
+ execStmtResp.getOperationHandle
+ }
+
+ def executeAndWait(sql: String): TOperationHandle = {
+ val opHandle = execute(sql)
+ waitForOperationToComplete(client, opHandle)
+ opHandle
+ }
+
+ def queryAndCheckLog(sql: String, checkedText: String): Unit = {
+ val opHandle = execute(sql)
+ val fetchResultsReq = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1000)
+ fetchResultsReq.setFetchType(1.toShort)
+ eventually(timeout(10.seconds), interval(100.milliseconds)) {
+ val resultsResp = client.FetchResults(fetchResultsReq)
+ val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
+ assert(logs.exists(_ contains checkedText))
+ }
+ }
+
+ queryAndCheckLog(querySQL, "Execute in incremental collect mode")
+ executeAndWait("SET kyuubi.operation.incremental.collect=false")
+ queryAndCheckLog(querySQL, "Execute in full collect mode")
+ executeAndWait("SET kyuubi.operation.incremental.collect=true")
+ queryAndCheckLog(querySQL, "Execute in incremental collect mode")
+ }
+ }
+
+ test("incremental collect query result") {
+ withJdbcStatement() { statement =>
+ val rs = statement.executeQuery("SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1")
+ val result = new ArrayBuffer[Int]
+ while (rs.next()) {
+ result += rs.getInt(1)
+ }
+ assert((Set(1, 2, 3) diff result.toSet).isEmpty)
+ }
+ }
+}