You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/08/13 01:29:30 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #926] Support add repartition for OptimizedCreateHiveTableAsSelectCommand

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e2d67e  [KYUUBI #926] Support add repartition for OptimizedCreateHiveTableAsSelectCommand
7e2d67e is described below

commit 7e2d67ed618bff5ca58e35814c681a94e457e9f3
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Fri Aug 13 09:29:18 2021 +0800

    [KYUUBI #926] Support add repartition for OptimizedCreateHiveTableAsSelectCommand
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    We missed a ctas node `OptimizedCreateHiveTableAsSelectCommand`. This PR aims to support add repartition for `OptimizedCreateHiveTableAsSelectCommand`.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #926 from ulysses-you/cats.
    
    Closes #926
    
    5ff28eed [ulysses-you] nit
    f530fb0d [ulysses-you] style
    4bf0a1cf [ulysses-you] ctas
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: ulysses-you <ul...@gmail.com>
---
 .../org/apache/kyuubi/sql/KyuubiAnalysis.scala     | 22 +++++++++++++++++++++-
 .../apache/spark/sql/KyuubiExtensionSuite.scala    | 19 +++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiAnalysis.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiAnalysis.scala
index 3d9b408..b550505 100644
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiAnalysis.scala
+++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiAnalysis.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
 import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
-import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable}
+import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.types.IntegerType
 
@@ -147,6 +147,26 @@ case class RepartitionBeforeWriteHive(session: SparkSession) extends Rule[Logica
             conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)))
       }
 
+    case c @ OptimizedCreateHiveTableAsSelectCommand(table, query, _, _)
+      if query.resolved && table.bucketSpec.isEmpty && canInsertRepartitionByExpression(query) =>
+      val dynamicPartitionColumns =
+        query.output.filter(attr => table.partitionColumnNames.contains(attr.name))
+      if (dynamicPartitionColumns.isEmpty) {
+        c.copy(query =
+          RepartitionByExpression(
+            Seq.empty,
+            query,
+            conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)))
+      } else {
+        val extended = dynamicPartitionColumns ++ dynamicPartitionExtraExpression(
+          conf.getConf(KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM))
+        c.copy(query =
+          RepartitionByExpression(
+            extended,
+            query,
+            conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)))
+      }
+
     case _ => plan
   }
 }
diff --git a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
index af42d8c..53d371f 100644
--- a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
+++ b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
@@ -21,6 +21,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Multiply}
 import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, CustomShuffleReaderExec, QueryStageExec}
 import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike}
+import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.test.SQLTestData.TestData
 import org.apache.spark.sql.test.SQLTestUtils
@@ -399,4 +401,21 @@ class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSpar
       }
     }
   }
+
+  test("OptimizedCreateHiveTableAsSelectCommand") {
+    withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true",
+      HiveUtils.CONVERT_METASTORE_CTAS.key -> "true") {
+      withTable("t") {
+        val df = sql(s"CREATE TABLE t STORED AS parquet AS SELECT 1 as a")
+        val ctas = df.queryExecution.analyzed.collect {
+          case _: OptimizedCreateHiveTableAsSelectCommand => true
+        }
+        assert(ctas.size == 1)
+        val repartition = df.queryExecution.analyzed.collect {
+          case _: RepartitionByExpression => true
+        }
+        assert(repartition.size == 1)
+      }
+    }
+  }
 }