You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/08/13 01:29:30 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #926] Support add
repartition for OptimizedCreateHiveTableAsSelectCommand
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 7e2d67e [KYUUBI #926] Support add repartition for OptimizedCreateHiveTableAsSelectCommand
7e2d67e is described below
commit 7e2d67ed618bff5ca58e35814c681a94e457e9f3
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Fri Aug 13 09:29:18 2021 +0800
[KYUUBI #926] Support add repartition for OptimizedCreateHiveTableAsSelectCommand
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
We missed a ctas node `OptimizedCreateHiveTableAsSelectCommand`. This PR aims to support add repartition for `OptimizedCreateHiveTableAsSelectCommand`.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #926 from ulysses-you/cats.
Closes #926
5ff28eed [ulysses-you] nit
f530fb0d [ulysses-you] style
4bf0a1cf [ulysses-you] ctas
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: ulysses-you <ul...@gmail.com>
---
.../org/apache/kyuubi/sql/KyuubiAnalysis.scala | 22 +++++++++++++++++++++-
.../apache/spark/sql/KyuubiExtensionSuite.scala | 19 +++++++++++++++++++
2 files changed, 40 insertions(+), 1 deletion(-)
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiAnalysis.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiAnalysis.scala
index 3d9b408..b550505 100644
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiAnalysis.scala
+++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiAnalysis.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
-import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable}
+import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.IntegerType
@@ -147,6 +147,26 @@ case class RepartitionBeforeWriteHive(session: SparkSession) extends Rule[Logica
conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)))
}
+ case c @ OptimizedCreateHiveTableAsSelectCommand(table, query, _, _)
+ if query.resolved && table.bucketSpec.isEmpty && canInsertRepartitionByExpression(query) =>
+ val dynamicPartitionColumns =
+ query.output.filter(attr => table.partitionColumnNames.contains(attr.name))
+ if (dynamicPartitionColumns.isEmpty) {
+ c.copy(query =
+ RepartitionByExpression(
+ Seq.empty,
+ query,
+ conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)))
+ } else {
+ val extended = dynamicPartitionColumns ++ dynamicPartitionExtraExpression(
+ conf.getConf(KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM))
+ c.copy(query =
+ RepartitionByExpression(
+ extended,
+ query,
+ conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM)))
+ }
+
case _ => plan
}
}
diff --git a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
index af42d8c..53d371f 100644
--- a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
+++ b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
@@ -21,6 +21,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Multiply}
import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, CustomShuffleReaderExec, QueryStageExec}
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike}
+import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.test.SQLTestUtils
@@ -399,4 +401,21 @@ class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSpar
}
}
}
+
+ test("OptimizedCreateHiveTableAsSelectCommand") {
+ withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true",
+ HiveUtils.CONVERT_METASTORE_CTAS.key -> "true") {
+ withTable("t") {
+ val df = sql(s"CREATE TABLE t STORED AS parquet AS SELECT 1 as a")
+ val ctas = df.queryExecution.analyzed.collect {
+ case _: OptimizedCreateHiveTableAsSelectCommand => true
+ }
+ assert(ctas.size == 1)
+ val repartition = df.queryExecution.analyzed.collect {
+ case _: RepartitionByExpression => true
+ }
+ assert(repartition.size == 1)
+ }
+ }
+ }
}