You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2021/10/12 06:46:13 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1198] [FEATURE] Support incremental collection

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new b1b7f25  [KYUUBI #1198] [FEATURE] Support incremental collection
b1b7f25 is described below

commit b1b7f25faf454518b8eaee0b874666b4aa065b93
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Tue Oct 12 14:46:05 2021 +0800

    [KYUUBI #1198] [FEATURE] Support incremental collection
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    Support incremental collection, [SPARK-25224](https://issues.apache.org/jira/browse/SPARK-25224)
    
    Introduce new conf: `kyuubi.operation.incremental.collect`
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #1198 from pan3793/inc-col.
    
    Closes #1198
    
    946068e9 [Cheng Pan] Address comments
    2798d0d8 [Cheng Pan] Correct conf doc
    3720fd41 [Cheng Pan] Incremental collection
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../engine/spark/operation/ExecuteStatement.scala  | 18 +++--
 .../spark/operation/SparkSQLOperationManager.scala | 11 ++-
 .../engine/spark/KyuubiStatementMonitorSuite.scala | 10 ---
 .../spark/operation/SparkOperationSuite.scala      | 10 ---
 .../spark/kyuubi/SQLOperationListenerSuite.scala   | 12 ++--
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  9 +++
 .../apache/kyuubi/operation/JDBCTestUtils.scala    | 14 +++-
 .../operation/KyuubiIncrementCollectSuite.scala    | 82 ++++++++++++++++++++++
 8 files changed, 132 insertions(+), 34 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 796d87b..2e2eb31 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -19,13 +19,16 @@ package org.apache.kyuubi.engine.spark.operation
 
 import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.kyuubi.SQLOperationListener
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.types._
 
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, KyuubiSparkUtil}
+import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, IterableFetchIterator, KyuubiSparkUtil}
 import org.apache.kyuubi.engine.spark.events.{EventLoggingService, SparkStatementEvent}
 import org.apache.kyuubi.operation.{OperationState, OperationType}
 import org.apache.kyuubi.operation.OperationState.OperationState
@@ -38,7 +41,8 @@ class ExecuteStatement(
     session: Session,
     protected override val statement: String,
     override val shouldRunAsync: Boolean,
-    queryTimeout: Long)
+    queryTimeout: Long,
+    incrementalCollect: Boolean)
   extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) with Logging {
 
   import org.apache.kyuubi.KyuubiSparkUtils._
@@ -88,11 +92,17 @@ class ExecuteStatement(
       // TODO: Make it configurable
       spark.sparkContext.addSparkListener(operationListener)
       result = spark.sql(statement)
-      // TODO( #921): COMPILED need consider eagerly executed commands
+      // TODO #921: COMPILED need consider eagerly executed commands
       statementEvent.queryExecution = result.queryExecution.toString()
       setState(OperationState.COMPILED)
       debug(result.queryExecution)
-      iter = new ArrayFetchIterator(result.collect())
+      iter = if (incrementalCollect) {
+        info("Execute in incremental collect mode")
+        new IterableFetchIterator[Row](result.toLocalIterator().asScala.toIterable)
+      } else {
+        info("Execute in full collect mode")
+        new ArrayFetchIterator(result.collect())
+      }
       setState(OperationState.FINISHED)
     } catch {
       onError(cancel = true)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index b34510b..ccda453 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.SparkSession
 
 import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.config.KyuubiConf.{OPERATION_PLAN_ONLY, OperationModes}
+import org.apache.kyuubi.config.KyuubiConf.{OPERATION_INCREMENTAL_COLLECT, OPERATION_PLAN_ONLY, OperationModes}
 import org.apache.kyuubi.config.KyuubiConf.OperationModes._
 import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
 import org.apache.kyuubi.operation.{Operation, OperationManager}
@@ -56,6 +56,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
   def getOpenSparkSessionCount: Int = sessionToSpark.size()
 
   private lazy val operationModeDefault = getConf.get(OPERATION_PLAN_ONLY)
+  private lazy val operationIncrementalCollectDefault = getConf.get(OPERATION_INCREMENTAL_COLLECT)
 
   override def newExecuteStatementOperation(
       session: Session,
@@ -66,9 +67,13 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
 
     val operationModeStr =
       spark.conf.get(OPERATION_PLAN_ONLY.key, operationModeDefault).toUpperCase(Locale.ROOT)
+    val incrementalCollect = spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key)
+      .map(_.toBoolean).getOrElse(operationIncrementalCollectDefault)
     val operation = OperationModes.withName(operationModeStr) match {
-      case NONE => new ExecuteStatement(spark, session, statement, runAsync, queryTimeout)
-      case mode => new PlanOnlyStatement(spark, session, statement, mode)
+      case NONE =>
+        new ExecuteStatement(spark, session, statement, runAsync, queryTimeout, incrementalCollect)
+      case mode =>
+        new PlanOnlyStatement(spark, session, statement, mode)
     }
     addOperation(operation)
   }
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala
index ea6a97c..4058557 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala
@@ -20,8 +20,6 @@ package org.apache.kyuubi.engine.spark
 import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.hive.service.rpc.thrift._
-import org.apache.hive.service.rpc.thrift.TCLIService.Iface
-import org.apache.hive.service.rpc.thrift.TOperationState._
 import org.apache.spark.scheduler.JobSucceeded
 import org.scalatest.PrivateMethodTester
 import org.scalatest.time.SpanSugar._
@@ -75,12 +73,4 @@ class KyuubiStatementMonitorSuite extends WithSparkSQLEngine with HiveJDBCTests
       assert(jobIdToJobInfoMap.size() === 1)
     }
   }
-
-  private def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = {
-    val req = new TGetOperationStatusReq(op)
-    var state = client.GetOperationStatus(req).getOperationState
-    while (state == INITIALIZED_STATE || state == PENDING_STATE || state == RUNNING_STATE) {
-      state = client.GetOperationStatus(req).getOperationState
-    }
-  }
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
index 8083c36..0e9e03d 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
@@ -30,8 +30,6 @@ import org.apache.hadoop.security.token.{Token, TokenIdentifier}
 import org.apache.hive.common.util.HiveVersionInfo
 import org.apache.hive.service.cli.HiveSQLException
 import org.apache.hive.service.rpc.thrift._
-import org.apache.hive.service.rpc.thrift.TCLIService.Iface
-import org.apache.hive.service.rpc.thrift.TOperationState._
 import org.apache.spark.kyuubi.SparkContextHelper
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.types._
@@ -495,14 +493,6 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveJDBCTests {
     }
   }
 
-  private def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = {
-    val req = new TGetOperationStatusReq(op)
-    var state = client.GetOperationStatus(req).getOperationState
-    while (state == INITIALIZED_STATE || state == PENDING_STATE || state == RUNNING_STATE) {
-      state = client.GetOperationStatus(req).getOperationState
-    }
-
-  }
   test("basic open | execute | close") {
     withThriftClient { client =>
       val operationManager = engine.backendService.sessionManager.
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala
index 2cc57d9..40c9747 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala
@@ -43,12 +43,12 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with JDBCTestUtils {
       fetchResultsReq.setFetchType(1.toShort)
       eventually(timeout(90.seconds), interval(500.milliseconds)) {
         val resultsResp = client.FetchResults(fetchResultsReq)
-        val toSeq = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala.toSeq
-        assert(toSeq.exists(_.contains("started with 2 stages")))
-        assert(toSeq.exists(_.contains("started with 1 tasks")))
-        assert(toSeq.exists(_.contains("started with 3 tasks")))
-        assert(toSeq.exists(_.contains("Finished stage:")))
-        assert(toSeq.exists(_.contains("Job 0 succeeded")))
+        val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
+        assert(logs.exists(_.contains("started with 2 stages")))
+        assert(logs.exists(_.contains("started with 1 tasks")))
+        assert(logs.exists(_.contains("started with 3 tasks")))
+        assert(logs.exists(_.contains("Finished stage:")))
+        assert(logs.exists(_.contains("Job 0 succeeded")))
       }
     }
   }
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 29d53c6..3447a18 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -687,6 +687,15 @@ object KyuubiConf {
       .checkValue(_ >= 1000, "must >= 1s if set")
       .createOptional
 
+  val OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
+    buildConf("operation.incremental.collect")
+      .internal
+      .doc("When true, the executor side result will be sequentially calculated and returned to" +
+        " the Spark driver side.")
+      .version("1.4.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val SERVER_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] =
     buildConf("operation.log.dir.root")
       .doc("Root directory for query operation log at server-side.")
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala
index 1decd7b..4df5930 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala
@@ -20,9 +20,12 @@ package org.apache.kyuubi.operation
 import java.sql.{DriverManager, ResultSet, SQLException, Statement}
 import java.util.Locale
 
-import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TSessionHandle}
+import org.apache.hive.service.rpc.thrift._
+import org.apache.hive.service.rpc.thrift.TCLIService.Iface
+import org.apache.hive.service.rpc.thrift.TOperationState._
 import org.apache.thrift.protocol.TBinaryProtocol
 import org.apache.thrift.transport.TSocket
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 
 import org.apache.kyuubi.{KyuubiFunSuite, Utils}
 import org.apache.kyuubi.service.authentication.PlainSASLHelper
@@ -169,4 +172,13 @@ trait JDBCTestUtils extends KyuubiFunSuite {
     assert(!rs.next())
     assert(dbNames.size === count, "All expected schemas should be visited")
   }
+
+  def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = {
+    val req = new TGetOperationStatusReq(op)
+    var state = client.GetOperationStatus(req).getOperationState
+    eventually(timeout(90.seconds), interval(100.milliseconds)) {
+      state = client.GetOperationStatus(req).getOperationState
+      assert(!Set(INITIALIZED_STATE, PENDING_STATE, RUNNING_STATE).contains(state))
+    }
+  }
 }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiIncrementCollectSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiIncrementCollectSuite.scala
new file mode 100644
index 0000000..b7aa4f5
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiIncrementCollectSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hive.service.rpc.thrift._
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.WithKyuubiServer
+import org.apache.kyuubi.config.KyuubiConf
+
+class KyuubiIncrementCollectSuite extends WithKyuubiServer with JDBCTestUtils {
+
+  override protected val conf: KyuubiConf = KyuubiConf()
+    .set(KyuubiConf.OPERATION_INCREMENTAL_COLLECT, true)
+
+  override protected def jdbcUrl: String = getJdbcUrl
+
+  test("change incremental collect mode using SET commands") {
+    val querySQL = "SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1"
+    withSessionHandle { (client, handle) =>
+      def execute(sql: String): TOperationHandle = {
+        val req = new TExecuteStatementReq()
+        req.setSessionHandle(handle)
+        req.setStatement(sql)
+        val execStmtResp = client.ExecuteStatement(req)
+        execStmtResp.getOperationHandle
+      }
+
+      def executeAndWait(sql: String): TOperationHandle = {
+        val opHandle = execute(sql)
+        waitForOperationToComplete(client, opHandle)
+        opHandle
+      }
+
+      def queryAndCheckLog(sql: String, checkedText: String): Unit = {
+        val opHandle = execute(sql)
+        val fetchResultsReq = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1000)
+        fetchResultsReq.setFetchType(1.toShort)
+        eventually(timeout(10.seconds), interval(100.milliseconds)) {
+          val resultsResp = client.FetchResults(fetchResultsReq)
+          val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
+          assert(logs.exists(_ contains checkedText))
+        }
+      }
+
+      queryAndCheckLog(querySQL, "Execute in incremental collect mode")
+      executeAndWait("SET kyuubi.operation.incremental.collect=false")
+      queryAndCheckLog(querySQL, "Execute in full collect mode")
+      executeAndWait("SET kyuubi.operation.incremental.collect=true")
+      queryAndCheckLog(querySQL, "Execute in incremental collect mode")
+    }
+  }
+
+  test("incremental collect query result") {
+    withJdbcStatement() { statement =>
+      val rs = statement.executeQuery("SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1")
+      val result = new ArrayBuffer[Int]
+      while (rs.next()) {
+        result += rs.getInt(1)
+      }
+      assert((Set(1, 2, 3) diff result.toSet).isEmpty)
+    }
+  }
+}