You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/10/21 02:44:06 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #1268] Simplify
extension code with Spark3.2 and Spark3.1 by unify zorder code
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new f0f065b [KYUUBI #1268] Simplify extension code with Spark3.2 and Spark3.1 by unify zorder code
f0f065b is described below
commit f0f065b983a15a577d21a3d39ae04dd7e12d058a
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Thu Oct 21 10:43:53 2021 +0800
[KYUUBI #1268] Simplify extension code with Spark3.2 and Spark3.1 by unify zorder code
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Reduce file and code size to help maintain the extension.
- move all zorder code into common module
- add KyuubiSparkSQLCommonExtension in common module
### _How was this patch tested?_
Psss CI
Closes #1268 from ulysses-you/simplify-extension.
Closes #1268
0eeb7161 [ulysses-you] add link
c4d35c5c [ulysses-you] simplify
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: ulysses-you <ul...@apache.org>
---
.../kyuubi/sql/KyuubiSparkSQLExtension.scala | 16 +-
.../sql/zorder/InsertZorderBeforeWriting.scala | 39 --
.../kyuubi/sql/zorder/OptimizeZorderCommand.scala | 30 --
.../sql/zorder/OptimizeZorderStatement.scala | 29 --
.../apache/kyuubi/sql/zorder/ResolveZorder.scala | 32 --
.../org/apache/kyuubi/sql/zorder/Zorder.scala | 23 -
.../zorder/ZorderSparkSqlExtensionsParser.scala | 26 -
.../kyuubi/sql/zorder/ZorderSqlAstBuilder.scala | 32 --
.../sql/InsertShuffleNodeBeforeJoinSuite.scala | 85 ----
dev/kyuubi-extension-spark-3-2/pom.xml | 1 +
.../kyuubi/sql/KyuubiSparkSQLExtension.scala | 17 +-
.../sql/zorder/InsertZorderBeforeWriting.scala | 39 --
.../kyuubi/sql/zorder/OptimizeZorderCommand.scala | 33 --
.../sql/zorder/OptimizeZorderStatement.scala | 31 --
.../apache/kyuubi/sql/zorder/ResolveZorder.scala | 32 --
.../org/apache/kyuubi/sql/zorder/Zorder.scala | 25 -
.../zorder/ZorderSparkSqlExtensionsParser.scala | 26 -
.../kyuubi/sql/zorder/ZorderSqlAstBuilder.scala | 32 --
.../scala/org/apache/spark/sql/ZorderSuite.scala | 533 ---------------------
.../sql/KyuubiSparkSQLCommonExtension.scala} | 21 +-
.../sql/zorder/InsertZorderBeforeWritingBase.scala | 19 +
.../sql/zorder/OptimizeZorderCommandBase.scala | 13 +
.../sql/zorder/OptimizeZorderStatementBase.scala | 11 +
.../kyuubi/sql/zorder/ResolveZorderBase.scala | 10 +
.../org/apache/kyuubi/sql/zorder/ZorderBase.scala | 5 +
.../ZorderSparkSqlExtensionsParserBase.scala | 6 +
.../sql/zorder/ZorderSqlAstBuilderBase.scala | 11 +
.../sql/InsertShuffleNodeBeforeJoinSuite.scala | 9 +-
.../scala/org/apache/spark/sql/ZorderSuite.scala | 12 +-
29 files changed, 105 insertions(+), 1093 deletions(-)
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index c6b4b59..f5a2ade 100644
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -21,7 +21,6 @@ import org.apache.spark.sql.SparkSessionExtensions
import org.apache.kyuubi.sql.sqlclassification.KyuubiSqlClassification
import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MarkAggregateOrderRule, MaxHivePartitionStrategy}
-import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder, ZorderSparkSqlExtensionsParser}
// scalastyle:off line.size.limit
/**
@@ -32,28 +31,15 @@ import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, Insert
// scalastyle:on line.size.limit
class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
- // inject zorder parser and related rules
- extensions.injectParser{ case (_, parser) => new ZorderSparkSqlExtensionsParser(parser) }
- extensions.injectResolutionRule(ResolveZorder)
-
+ KyuubiSparkSQLCommonExtension.injectCommonExtensions(extensions)
// a help rule for ForcedMaxOutputRowsRule
extensions.injectResolutionRule(MarkAggregateOrderRule)
- // Note that:
- // InsertZorderBeforeWritingDatasource and InsertZorderBeforeWritingHive
- // should be applied before
- // RepartitionBeforeWrite and RepartitionBeforeWriteHive
- // because we can only apply one of them (i.e. Global Sort or Repartition)
- extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
- extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
extensions.injectPostHocResolutionRule(KyuubiSqlClassification)
extensions.injectPostHocResolutionRule(RepartitionBeforeWritingDatasource)
extensions.injectPostHocResolutionRule(RepartitionBeforeWritingHive)
- extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
extensions.injectPostHocResolutionRule(ForcedMaxOutputRowsRule)
- extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
- extensions.injectQueryStagePrepRule(FinalStageConfigIsolation(_))
extensions.injectPlannerStrategy(MaxHivePartitionStrategy)
}
}
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
deleted file mode 100644
index 7b4d215..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.Expression
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has zorder properties
- */
-case class InsertZorderBeforeWritingDatasource(session: SparkSession)
- extends InsertZorderBeforeWritingDatasourceBase {
- override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder properties
- */
-case class InsertZorderBeforeWritingHive(session: SparkSession)
- extends InsertZorderBeforeWritingHiveBase {
- override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
-}
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
deleted file mode 100644
index 401646b..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/**
- * A runnable command for zorder, we delegate to real command to execute
- */
-case class OptimizeZorderCommand(
- catalogTable: CatalogTable,
- query: LogicalPlan)
- extends OptimizeZorderCommandBase {
-}
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala
deleted file mode 100644
index 55a5dcd..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/**
- * A zorder statement that contains we parsed from SQL.
- * We should convert this plan to certain command at Analyzer.
- */
-case class OptimizeZorderStatement(
- tableIdentifier: Seq[String],
- query: LogicalPlan) extends OptimizeZorderStatementBase {
-}
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala
deleted file mode 100644
index 21888b3..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/**
- * Resolve `OptimizeZorderStatement` to `OptimizeZorderCommand`
- */
-case class ResolveZorder(session: SparkSession) extends ResolveZorderBase {
- override def buildOptimizeZorderCommand(
- catalogTable: CatalogTable, query: LogicalPlan): OptimizeZorderCommandBase = {
- OptimizeZorderCommand(catalogTable, query)
- }
-}
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
deleted file mode 100644
index c5916f5..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.expressions.Expression
-
-case class Zorder(children: Seq[Expression]) extends ZorderBase {
-}
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParser.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParser.scala
deleted file mode 100644
index b37fb83..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParser.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-
-class ZorderSparkSqlExtensionsParser(
- override val delegate: ParserInterface)
- extends ZorderSparkSqlExtensionsParserBase {
- def astBuilder: ZorderSqlAstBuilderBase = new ZorderSqlAstBuilder
-}
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
deleted file mode 100644
index 767cbd2..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-class ZorderSqlAstBuilder extends ZorderSqlAstBuilderBase {
- override def buildZorder(child: Seq[Expression]): ZorderBase = {
- Zorder(child)
- }
-
- override def buildOptimizeZorderStatement(
- tableIdentifier: Seq[String], query: LogicalPlan): OptimizeZorderStatementBase = {
- OptimizeZorderStatement(tableIdentifier, query)
- }
-}
diff --git a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
deleted file mode 100644
index 7abeee4..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike}
-import org.apache.spark.sql.internal.SQLConf
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-class InsertShuffleNodeBeforeJoinSuite extends KyuubiSparkSQLExtensionTest {
- protected override def beforeAll(): Unit = {
- super.beforeAll()
- setupData()
- }
-
- test("force shuffle before join") {
- def checkShuffleNodeNum(sqlString: String, num: Int): Unit = {
- var expectedResult: Seq[Row] = Seq.empty
- withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
- expectedResult = sql(sqlString).collect()
- }
- val df = sql(sqlString)
- checkAnswer(df, expectedResult)
- assert(
- collect(df.queryExecution.executedPlan) {
- case shuffle: ShuffleExchangeLike
- if shuffle.shuffleOrigin == ENSURE_REQUIREMENTS => shuffle
- }.size == num)
- }
-
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
- KyuubiSQLConf.FORCE_SHUFFLE_BEFORE_JOIN.key -> "true") {
- Seq("SHUFFLE_HASH", "MERGE").foreach { joinHint =>
- // positive case
- checkShuffleNodeNum(
- s"""
- |SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from t1
- | JOIN t2 ON t1.c1 = t2.c1
- | JOIN t3 ON t1.c1 = t3.c1
- | """.stripMargin, 4)
-
- // negative case
- checkShuffleNodeNum(
- s"""
- |SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from t1
- | JOIN t2 ON t1.c1 = t2.c1
- | JOIN t3 ON t1.c2 = t3.c2
- | """.stripMargin, 4)
- }
-
- checkShuffleNodeNum(
- """
- |SELECT t1.c1, t2.c1, t3.c2 from t1
- | JOIN t2 ON t1.c1 = t2.c1
- | JOIN (
- | SELECT c2, count(*) FROM t1 GROUP BY c2
- | ) t3 ON t1.c1 = t3.c2
- | """.stripMargin, 5)
-
- checkShuffleNodeNum(
- """
- |SELECT t1.c1, t2.c1, t3.c1 from t1
- | JOIN t2 ON t1.c1 = t2.c1
- | JOIN (
- | SELECT c1, count(*) FROM t1 GROUP BY c1
- | ) t3 ON t1.c1 = t3.c1
- | """.stripMargin, 5)
- }
- }
-}
diff --git a/dev/kyuubi-extension-spark-3-2/pom.xml b/dev/kyuubi-extension-spark-3-2/pom.xml
index 92d8064..5cf2f9b 100644
--- a/dev/kyuubi-extension-spark-3-2/pom.xml
+++ b/dev/kyuubi-extension-spark-3-2/pom.xml
@@ -23,6 +23,7 @@
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-parent</artifactId>
<version>1.4.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index f2b9ab1..2de572f 100644
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -19,8 +19,6 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
-import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder, ZorderSparkSqlExtensionsParser}
-
// scalastyle:off line.size.limit
/**
* Depend on Spark SQL Extension framework, we can use this extension follow steps
@@ -30,22 +28,9 @@ import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, Insert
// scalastyle:on line.size.limit
class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
- // inject zorder parser and related rules
- extensions.injectParser{ case (_, parser) => new ZorderSparkSqlExtensionsParser(parser) }
- extensions.injectResolutionRule(ResolveZorder)
+ KyuubiSparkSQLCommonExtension.injectCommonExtensions(extensions)
- // Note that:
- // InsertZorderBeforeWritingDatasource and InsertZorderBeforeWritingHive
- // should be applied before
- // RebalanceBeforeWritingDatasource and RebalanceBeforeWritingHive
- // because we can only apply one of them (i.e. Global Sort or Rebalance)
- extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
- extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
extensions.injectPostHocResolutionRule(RebalanceBeforeWritingDatasource)
extensions.injectPostHocResolutionRule(RebalanceBeforeWritingHive)
- extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
-
- extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
- extensions.injectQueryStagePrepRule(FinalStageConfigIsolation(_))
}
}
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
deleted file mode 100644
index 7b4d215..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.Expression
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has zorder properties
- */
-case class InsertZorderBeforeWritingDatasource(session: SparkSession)
- extends InsertZorderBeforeWritingDatasourceBase {
- override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder properties
- */
-case class InsertZorderBeforeWritingHive(session: SparkSession)
- extends InsertZorderBeforeWritingHiveBase {
- override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
deleted file mode 100644
index 9f0d58c..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/**
- * A runnable command for zorder, we delegate to real command to execute
- */
-case class OptimizeZorderCommand(
- catalogTable: CatalogTable,
- query: LogicalPlan)
- extends OptimizeZorderCommandBase {
- override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
- copy(query = newChild)
- }
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala
deleted file mode 100644
index 785b5cb..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/**
- * A zorder statement that contains we parsed from SQL.
- * We should convert this plan to certain command at Analyzer.
- */
-case class OptimizeZorderStatement(
- tableIdentifier: Seq[String],
- query: LogicalPlan) extends OptimizeZorderStatementBase {
- override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
- copy(query = newChild)
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala
deleted file mode 100644
index 21888b3..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/**
- * Resolve `OptimizeZorderStatement` to `OptimizeZorderCommand`
- */
-case class ResolveZorder(session: SparkSession) extends ResolveZorderBase {
- override def buildOptimizeZorderCommand(
- catalogTable: CatalogTable, query: LogicalPlan): OptimizeZorderCommandBase = {
- OptimizeZorderCommand(catalogTable, query)
- }
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
deleted file mode 100644
index be2f1ba..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.expressions.Expression
-
-case class Zorder(children: Seq[Expression]) extends ZorderBase {
- override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
- copy(children = newChildren)
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParser.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParser.scala
deleted file mode 100644
index b37fb83..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParser.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-
-class ZorderSparkSqlExtensionsParser(
- override val delegate: ParserInterface)
- extends ZorderSparkSqlExtensionsParserBase {
- def astBuilder: ZorderSqlAstBuilderBase = new ZorderSqlAstBuilder
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
deleted file mode 100644
index 767cbd2..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-class ZorderSqlAstBuilder extends ZorderSqlAstBuilderBase {
- override def buildZorder(child: Seq[Expression]): ZorderBase = {
- Zorder(child)
- }
-
- override def buildOptimizeZorderStatement(
- tableIdentifier: Seq[String], query: LogicalPlan): OptimizeZorderStatementBase = {
- OptimizeZorderStatement(tableIdentifier, query)
- }
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/ZorderSuite.scala b/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
deleted file mode 100644
index 7c03abc..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
+++ /dev/null
@@ -1,533 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, AttributeReference, Expression, ExpressionEvalHelper, Literal, NullsLast, SortOrder}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project, Sort}
-import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
-import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types._
-
-import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
-import org.apache.kyuubi.sql.zorder.{OptimizeZorderCommandBase, Zorder}
-
-trait ZorderSuite extends KyuubiSparkSQLExtensionTest with ExpressionEvalHelper {
-
- test("optimize unpartitioned table") {
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
- withTable("up") {
- sql(s"DROP TABLE IF EXISTS up")
-
- val target = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
- Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
- Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
- Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
- sql(s"CREATE TABLE up (c1 INT, c2 INT, c3 INT)")
- sql(s"INSERT INTO TABLE up VALUES" +
- "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
- "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
- "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
- "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-
- val e = intercept[KyuubiSQLExtensionException] {
- sql("OPTIMIZE up WHERE c1 > 1 ZORDER BY c1, c2")
- }
- assert(e.getMessage == "Filters are only supported for partitioned table")
-
- sql("OPTIMIZE up ZORDER BY c1, c2")
- val res = sql("SELECT c1, c2 FROM up").collect()
-
- assert(res.length == 16)
-
- for (i <- target.indices) {
- val t = target(i)
- val r = res(i)
- assert(t(0) == r.getInt(0))
- assert(t(1) == r.getInt(1))
- }
- }
- }
- }
-
- test("optimize partitioned table") {
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
- withTable("p") {
- sql("DROP TABLE IF EXISTS p")
-
- val target = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
- Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
- Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
- Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
-
- sql(s"CREATE TABLE p (c1 INT, c2 INT, c3 INT) PARTITIONED BY (id INT)")
- sql(s"ALTER TABLE p ADD PARTITION (id = 1)")
- sql(s"ALTER TABLE p ADD PARTITION (id = 2)")
- sql(s"INSERT INTO TABLE p PARTITION (id = 1) VALUES" +
- "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
- "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
- "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
- "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
- sql(s"INSERT INTO TABLE p PARTITION (id = 2) VALUES" +
- "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
- "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
- "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
- "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-
- sql(s"OPTIMIZE p ZORDER BY c1, c2")
-
- val res1 = sql(s"SELECT c1, c2 FROM p WHERE id = 1").collect()
- val res2 = sql(s"SELECT c1, c2 FROM p WHERE id = 2").collect()
-
- assert(res1.length == 16)
- assert(res2.length == 16)
-
- for (i <- target.indices) {
- val t = target(i)
- val r1 = res1(i)
- assert(t(0) == r1.getInt(0))
- assert(t(1) == r1.getInt(1))
-
- val r2 = res2(i)
- assert(t(0) == r2.getInt(0))
- assert(t(1) == r2.getInt(1))
- }
- }
- }
- }
-
- test("optimize partitioned table with filters") {
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
- withTable("p") {
- sql("DROP TABLE IF EXISTS p")
-
- val target1 = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
- Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
- Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
- Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
- val target2 = Seq(Seq(0, 0), Seq(0, 1), Seq(0, 2), Seq(0, 3),
- Seq(1, 0), Seq(1, 1), Seq(1, 2), Seq(1, 3),
- Seq(2, 0), Seq(2, 1), Seq(2, 2), Seq(2, 3),
- Seq(3, 0), Seq(3, 1), Seq(3, 2), Seq(3, 3))
- sql(s"CREATE TABLE p (c1 INT, c2 INT, c3 INT) PARTITIONED BY (id INT)")
- sql(s"ALTER TABLE p ADD PARTITION (id = 1)")
- sql(s"ALTER TABLE p ADD PARTITION (id = 2)")
- sql(s"INSERT INTO TABLE p PARTITION (id = 1) VALUES" +
- "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
- "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
- "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
- "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
- sql(s"INSERT INTO TABLE p PARTITION (id = 2) VALUES" +
- "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
- "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
- "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
- "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-
- val e = intercept[KyuubiSQLExtensionException](
- sql(s"OPTIMIZE p WHERE id = 1 AND c1 > 1 ZORDER BY c1, c2")
- )
- assert(e.getMessage == "Only partition column filters are allowed")
-
- sql(s"OPTIMIZE p WHERE id = 1 ZORDER BY c1, c2")
-
- val res1 = sql(s"SELECT c1, c2 FROM p WHERE id = 1").collect()
- val res2 = sql(s"SELECT c1, c2 FROM p WHERE id = 2").collect()
-
- assert(res1.length == 16)
- assert(res2.length == 16)
-
- for (i <- target1.indices) {
- val t1 = target1(i)
- val r1 = res1(i)
- assert(t1(0) == r1.getInt(0))
- assert(t1(1) == r1.getInt(1))
-
- val t2 = target2(i)
- val r2 = res2(i)
- assert(t2(0) == r2.getInt(0))
- assert(t2(1) == r2.getInt(1))
- }
- }
- }
- }
-
- test("optimize zorder with datasource table") {
- // TODO remove this if we support datasource table
- withTable("t") {
- sql("CREATE TABLE t (c1 int, c2 int) USING PARQUET")
- val msg = intercept[KyuubiSQLExtensionException] {
- sql("OPTIMIZE t ZORDER BY c1, c2")
- }.getMessage
- assert(msg.contains("only support hive table"))
- }
- }
-
- private def checkZorderTable(
- enabled: Boolean,
- cols: String,
- planHasRepartition: Boolean,
- resHasSort: Boolean): Unit = {
- def checkSort(plan: LogicalPlan): Unit = {
- assert(plan.isInstanceOf[Sort] === resHasSort)
- if (plan.isInstanceOf[Sort]) {
- val colArr = cols.split(",")
- val refs = if (colArr.length == 1) {
- plan.asInstanceOf[Sort].order.head
- .child.asInstanceOf[AttributeReference] :: Nil
- } else {
- plan.asInstanceOf[Sort].order.head
- .child.asInstanceOf[Zorder].children.map(_.references.head)
- }
- assert(refs.size === colArr.size)
- refs.zip(colArr).foreach { case (ref, col) =>
- assert(ref.name === col.trim)
- }
- }
- }
-
- val repartition = if (planHasRepartition) {
- "/*+ repartition */"
- } else {
- ""
- }
- withSQLConf("spark.sql.shuffle.partitions" -> "1") {
- // hive
- withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
- withTable("zorder_t1", "zorder_t2_true", "zorder_t2_false") {
- sql(
- s"""
- |CREATE TABLE zorder_t1 (c1 int, c2 string, c3 long, c4 double) STORED AS PARQUET
- |TBLPROPERTIES (
- | 'kyuubi.zorder.enabled' = '$enabled',
- | 'kyuubi.zorder.cols' = '$cols')
- |""".stripMargin)
- val df1 = sql(s"""
- |INSERT INTO TABLE zorder_t1
- |SELECT $repartition * FROM VALUES(1,'a',2,4D),(2,'b',3,6D)
- |""".stripMargin)
- assert(df1.queryExecution.analyzed.isInstanceOf[InsertIntoHiveTable])
- checkSort(df1.queryExecution.analyzed.children.head)
-
- Seq("true", "false").foreach { optimized =>
- withSQLConf("spark.sql.hive.convertMetastoreCtas" -> optimized,
- "spark.sql.hive.convertMetastoreParquet" -> optimized) {
- val df2 =
- sql(
- s"""
- |CREATE TABLE zorder_t2_$optimized STORED AS PARQUET
- |TBLPROPERTIES (
- | 'kyuubi.zorder.enabled' = '$enabled',
- | 'kyuubi.zorder.cols' = '$cols')
- |
- |SELECT $repartition * FROM
- |VALUES(1,'a',2,4D),(2,'b',3,6D) AS t(c1 ,c2 , c3, c4)
- |""".stripMargin)
- if (optimized.toBoolean) {
- assert(df2.queryExecution.analyzed
- .isInstanceOf[OptimizedCreateHiveTableAsSelectCommand])
- } else {
- assert(df2.queryExecution.analyzed.isInstanceOf[CreateHiveTableAsSelectCommand])
- }
- checkSort(df2.queryExecution.analyzed.children.head)
- }
- }
- }
- }
-
- // datasource
- withTable("zorder_t3", "zorder_t4") {
- sql(
- s"""
- |CREATE TABLE zorder_t3 (c1 int, c2 string, c3 long, c4 double) USING PARQUET
- |TBLPROPERTIES (
- | 'kyuubi.zorder.enabled' = '$enabled',
- | 'kyuubi.zorder.cols' = '$cols')
- |""".stripMargin)
- val df1 = sql(s"""
- |INSERT INTO TABLE zorder_t3
- |SELECT $repartition * FROM VALUES(1,'a',2,4D),(2,'b',3,6D)
- |""".stripMargin)
- assert(df1.queryExecution.analyzed.isInstanceOf[InsertIntoHadoopFsRelationCommand])
- checkSort(df1.queryExecution.analyzed.children.head)
-
- val df2 =
- sql(
- s"""
- |CREATE TABLE zorder_t4 USING PARQUET
- |TBLPROPERTIES (
- | 'kyuubi.zorder.enabled' = '$enabled',
- | 'kyuubi.zorder.cols' = '$cols')
- |
- |SELECT $repartition * FROM
- |VALUES(1,'a',2,4D),(2,'b',3,6D) AS t(c1 ,c2 , c3, c4)
- |""".stripMargin)
- assert(df2.queryExecution.analyzed.isInstanceOf[CreateDataSourceTableAsSelectCommand])
- checkSort(df2.queryExecution.analyzed.children.head)
- }
- }
- }
-
- test("Support insert zorder by table properties") {
- withSQLConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING.key -> "false") {
- checkZorderTable(true, "c1", false, false)
- checkZorderTable(false, "c1", false, false)
- }
- withSQLConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING.key -> "true") {
- checkZorderTable(true, "", false, false)
- checkZorderTable(true, "c5", false, false)
- checkZorderTable(true, "c1,c5", false, false)
- checkZorderTable(false, "c3", false, false)
- checkZorderTable(true, "c3", true, false)
- checkZorderTable(true, "c3", false, true)
- checkZorderTable(true, "c2,c4", false, true)
- checkZorderTable(true, "c4, c2, c1, c3", false, true)
- }
- }
-
- test("zorder: check unsupported data type") {
- def checkZorderPlan(zorder: Expression): Unit = {
- val msg = intercept[AnalysisException] {
- val plan = Project(Seq(Alias(zorder, "c")()), OneRowRelation())
- spark.sessionState.analyzer.checkAnalysis(plan)
- }.getMessage
- assert(msg.contains("Unsupported z-order type: void"))
- }
-
- checkZorderPlan(Zorder(Seq(Literal(null, NullType))))
- checkZorderPlan(Zorder(Seq(Literal(1, IntegerType), Literal(null, NullType))))
- }
-
- test("zorder: check supported data type") {
- val children = Seq(
- Literal.create(false, BooleanType),
- Literal.create(null, BooleanType),
- Literal.create(1.toByte, ByteType),
- Literal.create(null, ByteType),
- Literal.create(1.toShort, ShortType),
- Literal.create(null, ShortType),
- Literal.create(1, IntegerType),
- Literal.create(null, IntegerType),
- Literal.create(1L, LongType),
- Literal.create(null, LongType),
- Literal.create(1f, FloatType),
- Literal.create(null, FloatType),
- Literal.create(1d, DoubleType),
- Literal.create(null, DoubleType),
- Literal.create("1", StringType),
- Literal.create(null, StringType),
- Literal.create(1L, TimestampType),
- Literal.create(null, TimestampType),
- Literal.create(1, DateType),
- Literal.create(null, DateType),
- Literal.create(BigDecimal(1, 1), DecimalType(1, 1)),
- Literal.create(null, DecimalType(1, 1))
- )
- val zorder = Zorder(children)
- val plan = Project(Seq(Alias(zorder, "c")()), OneRowRelation())
- spark.sessionState.analyzer.checkAnalysis(plan)
- assert(zorder.foldable)
-
-// // scalastyle:off
-// val resultGen = org.apache.commons.codec.binary.Hex.encodeHex(
-// zorder.eval(InternalRow.fromSeq(children)).asInstanceOf[Array[Byte]], false)
-// resultGen.grouped(2).zipWithIndex.foreach { case (char, i) =>
-// print("0x" + char(0) + char(1) + ", ")
-// if ((i + 1) % 10 == 0) {
-// println()
-// }
-// }
-// // scalastyle:on
-
- val expected = Array(
- 0xFB, 0xEA, 0xAA, 0xBA, 0xAE, 0xAB, 0xAA, 0xEA, 0xBA, 0xAE,
- 0xAB, 0xAA, 0xEA, 0xBA, 0xA6, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA,
- 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA,
- 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA,
- 0xBA, 0xBB, 0xAA, 0xAA, 0xAA, 0xBA, 0xAA, 0xBA, 0xAA, 0xBA,
- 0xAA, 0xBA, 0xAA, 0xBA, 0xAA, 0xBA, 0xAA, 0x9A, 0xAA, 0xAA,
- 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA,
- 0xAA, 0xAA, 0xAA, 0xAA, 0xEA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA,
- 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA,
- 0xAA, 0xAA, 0xBE, 0xAA, 0xAA, 0x8A, 0xBA, 0xAA, 0x2A, 0xEA,
- 0xA8, 0xAA, 0xAA, 0xA2, 0xAA, 0xAA, 0x8A, 0xAA, 0xAA, 0x2F,
- 0xEB, 0xFC)
- .map(_.toByte)
- checkEvaluation(zorder, expected, InternalRow.fromSeq(children))
- }
-
- private def checkSort(input: DataFrame, expected: Seq[Row], dataType: Array[DataType]): Unit = {
- withTempDir { dir =>
- input.repartition(3).write.mode("overwrite").format("parquet").save(dir.getCanonicalPath)
- val df = spark.read.format("parquet")
- .load(dir.getCanonicalPath)
- .repartition(1)
- assert(df.schema.fields.map(_.dataType).sameElements(dataType))
- val exprs = Seq("c1", "c2").map(col).map(_.expr)
- val sortOrder = SortOrder(Zorder(exprs), Ascending, NullsLast, Seq.empty)
- val zorderSort = Sort(Seq(sortOrder), true, df.logicalPlan)
- val result = Dataset.ofRows(spark, zorderSort)
- checkAnswer(result, expected)
- }
- }
-
- test("sort with zorder -- int column") {
- // TODO: add more datatype unit test
- val session = spark
- import session.implicits._
- // generate 4 * 4 matrix
- val len = 3
- val input = spark.range(len + 1).selectExpr("cast(id as int) as c1")
- .select($"c1", explode(sequence(lit(0), lit(len))) as "c2")
- val expected =
- Row(0, 0) :: Row(1, 0) :: Row(0, 1) :: Row(1, 1) ::
- Row(2, 0) :: Row(3, 0) :: Row(2, 1) :: Row(3, 1) ::
- Row(0, 2) :: Row(1, 2) :: Row(0, 3) :: Row(1, 3) ::
- Row(2, 2) :: Row(3, 2) :: Row(2, 3) :: Row(3, 3) :: Nil
- checkSort(input, expected, Array(IntegerType, IntegerType))
-
- // contains null value case.
- val nullDF = spark.range(1).selectExpr("cast(null as int) as c1")
- val input2 = spark.range(len).selectExpr("cast(id as int) as c1")
- .union(nullDF)
- .select(
- $"c1",
- explode(concat(sequence(lit(0), lit(len - 1)), array(lit(null)))) as "c2")
- val expected2 = Row(0, 0) :: Row(1, 0) :: Row(0, 1) :: Row(1, 1) ::
- Row(2, 0) :: Row(2, 1) :: Row(0, 2) :: Row(1, 2) ::
- Row(2, 2) :: Row(null, 0) :: Row(null, 1) :: Row(null, 2) ::
- Row(0, null) :: Row(1, null) :: Row(2, null) :: Row(null, null) :: Nil
- checkSort(input2, expected2, Array(IntegerType, IntegerType))
- }
-
- test("sort with zorder -- string column") {
- val schema = StructType(StructField("c1", StringType) :: StructField("c2", StringType) :: Nil)
- val rdd = spark.sparkContext.parallelize(Seq(
- Row("a", "a"), Row("a", "b"), Row("a", "c"), Row("a", "d"),
- Row("b", "a"), Row("b", "b"), Row("b", "c"), Row("b", "d"),
- Row("c", "a"), Row("c", "b"), Row("c", "c"), Row("c", "d"),
- Row("d", "a"), Row("d", "b"), Row("d", "c"), Row("d", "d")))
- val input = spark.createDataFrame(rdd, schema)
- val expected = Row("a", "a") :: Row("b", "a") :: Row("c", "a") :: Row("a", "b") ::
- Row("a", "c") :: Row("b", "b") :: Row("c", "b") :: Row("b", "c") ::
- Row("c", "c") :: Row("d", "a") :: Row("d", "b") :: Row("d", "c") ::
- Row("a", "d") :: Row("b", "d") :: Row("c", "d") :: Row("d", "d") :: Nil
- checkSort(input, expected, Array(StringType, StringType))
-
- val rdd2 = spark.sparkContext.parallelize(Seq(
- Row(null, "a"), Row("a", "b"), Row("a", "c"), Row("a", null),
- Row("b", "a"), Row(null, "b"), Row("b", null), Row("b", "d"),
- Row("c", "a"), Row("c", null), Row(null, "c"), Row("c", "d"),
- Row("d", null), Row("d", "b"), Row("d", "c"), Row(null, "d"), Row(null, null)))
- val input2 = spark.createDataFrame(rdd2, schema)
- val expected2 = Row("b", "a") :: Row("c", "a") :: Row("a", "b") :: Row("a", "c") ::
- Row("d", "b") :: Row("d", "c") :: Row("b", "d") :: Row("c", "d") ::
- Row(null, "a") :: Row(null, "b") :: Row(null, "c") :: Row(null, "d") ::
- Row("a", null) :: Row("b", null) :: Row("c", null) :: Row("d", null) ::
- Row(null, null) :: Nil
- checkSort(input2, expected2, Array(StringType, StringType))
- }
-
- test("test special value of short int long type") {
- val df1 = spark.createDataFrame(Seq(
- (-1, -1L),
- (Int.MinValue, Int.MinValue.toLong),
- (1, 1L),
- (Int.MaxValue - 1, Int.MaxValue.toLong),
- (Int.MaxValue - 1, Int.MaxValue.toLong - 1),
- (Int.MaxValue, Int.MaxValue.toLong + 1),
- (Int.MaxValue, Int.MaxValue.toLong))).toDF("c1", "c2")
- val expected1 =
- Row(Int.MinValue, Int.MinValue.toLong) ::
- Row(-1, -1L) ::
- Row(1, 1L) ::
- Row(Int.MaxValue - 1, Int.MaxValue.toLong - 1) ::
- Row(Int.MaxValue - 1, Int.MaxValue.toLong) ::
- Row(Int.MaxValue, Int.MaxValue.toLong) ::
- Row(Int.MaxValue, Int.MaxValue.toLong + 1) :: Nil
- checkSort(df1, expected1, Array(IntegerType, LongType))
-
- val df2 = spark.createDataFrame(Seq(
- (-1, -1.toShort),
- (Short.MinValue.toInt, Short.MinValue),
- (1, 1.toShort),
- (Short.MaxValue.toInt, (Short.MaxValue - 1).toShort),
- (Short.MaxValue.toInt + 1, (Short.MaxValue - 1).toShort),
- (Short.MaxValue.toInt, Short.MaxValue),
- (Short.MaxValue.toInt + 1, Short.MaxValue))).toDF("c1", "c2")
- val expected2 =
- Row(Short.MinValue.toInt, Short.MinValue) ::
- Row(-1, -1.toShort) ::
- Row(1, 1.toShort) ::
- Row(Short.MaxValue.toInt, Short.MaxValue - 1) ::
- Row(Short.MaxValue.toInt, Short.MaxValue) ::
- Row(Short.MaxValue.toInt + 1, Short.MaxValue - 1) ::
- Row(Short.MaxValue.toInt + 1, Short.MaxValue) :: Nil
- checkSort(df2, expected2, Array(IntegerType, ShortType))
-
- val df3 = spark.createDataFrame(Seq(
- (-1L, -1.toShort),
- (Short.MinValue.toLong, Short.MinValue),
- (1L, 1.toShort),
- (Short.MaxValue.toLong, (Short.MaxValue - 1).toShort),
- (Short.MaxValue.toLong + 1, (Short.MaxValue - 1).toShort),
- (Short.MaxValue.toLong, Short.MaxValue),
- (Short.MaxValue.toLong + 1, Short.MaxValue))).toDF("c1", "c2")
- val expected3 =
- Row(Short.MinValue.toLong, Short.MinValue) ::
- Row(-1L, -1.toShort) ::
- Row(1L, 1.toShort) ::
- Row(Short.MaxValue.toLong, Short.MaxValue - 1) ::
- Row(Short.MaxValue.toLong, Short.MaxValue) ::
- Row(Short.MaxValue.toLong + 1, Short.MaxValue - 1) ::
- Row(Short.MaxValue.toLong + 1, Short.MaxValue) :: Nil
- checkSort(df3, expected3, Array(LongType, ShortType))
- }
-
- test("skip zorder if only requires one column") {
- withTable("t") {
- withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
- sql("CREATE TABLE t (c1 int, c2 string) stored as parquet")
- val order1 = sql("OPTIMIZE t ZORDER BY c1").queryExecution.analyzed
- .asInstanceOf[OptimizeZorderCommandBase].query.asInstanceOf[Sort].order.head.child
- assert(!order1.isInstanceOf[Zorder])
- assert(order1.isInstanceOf[AttributeReference])
- }
- }
- }
-}
-
-class ZorderWithCodegenEnabledSuite extends ZorderSuite {
- override def sparkConf(): SparkConf = {
- val conf = super.sparkConf
- conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
- conf
- }
-}
-
-class ZorderWithCodegenDisabledSuite extends ZorderSuite {
- override def sparkConf(): SparkConf = {
- val conf = super.sparkConf
- conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false")
- conf.set(SQLConf.CODEGEN_FACTORY_MODE.key, "NO_CODEGEN")
- conf
- }
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
similarity index 75%
copy from dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
copy to dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
index f2b9ab1..a6b4f86 100644
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
@@ -21,15 +21,14 @@ import org.apache.spark.sql.SparkSessionExtensions
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder, ZorderSparkSqlExtensionsParser}
-// scalastyle:off line.size.limit
-/**
- * Depend on Spark SQL Extension framework, we can use this extension follow steps
- * 1. move this jar into $SPARK_HOME/jars
- * 2. add config into `spark-defaults.conf`: `spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension`
- */
-// scalastyle:on line.size.limit
-class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
+class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
+ KyuubiSparkSQLCommonExtension.injectCommonExtensions(extensions)
+ }
+}
+
+object KyuubiSparkSQLCommonExtension {
+ def injectCommonExtensions(extensions: SparkSessionExtensions): Unit = {
// inject zorder parser and related rules
extensions.injectParser{ case (_, parser) => new ZorderSparkSqlExtensionsParser(parser) }
extensions.injectResolutionRule(ResolveZorder)
@@ -37,12 +36,10 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
// Note that:
// InsertZorderBeforeWritingDatasource and InsertZorderBeforeWritingHive
// should be applied before
- // RebalanceBeforeWritingDatasource and RebalanceBeforeWritingHive
- // because we can only apply one of them (i.e. Global Sort or Rebalance)
+ // RepartitionBeforeWriting and RebalanceBeforeWriting
+ // because we can only apply one of them (i.e. Global Sort or Repartition/Rebalance)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
- extensions.injectPostHocResolutionRule(RebalanceBeforeWritingDatasource)
- extensions.injectPostHocResolutionRule(RebalanceBeforeWritingHive)
extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
index 974e04b..e76e81f 100644
--- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.sql.zorder
import java.util.Locale
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, NullsLast, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Repartition, RepartitionByExpression, Sort}
@@ -161,3 +162,21 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
}
}
}
+
+/**
+ * TODO: shall we forbid zorder if it's dynamic partition inserts ?
+ * Insert zorder before writing datasource if the target table properties has zorder properties
+ */
+case class InsertZorderBeforeWritingDatasource(session: SparkSession)
+ extends InsertZorderBeforeWritingDatasourceBase {
+ override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
+}
+
+/**
+ * TODO: shall we forbid zorder if it's dynamic partition inserts ?
+ * Insert zorder before writing hive if the target table properties has zorder properties
+ */
+case class InsertZorderBeforeWritingHive(session: SparkSession)
+ extends InsertZorderBeforeWritingHiveBase {
+ override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
+}
diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommandBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommandBase.scala
index 262964a..c4f52fb 100644
--- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommandBase.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommandBase.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.sql.zorder
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
@@ -64,3 +65,15 @@ abstract class OptimizeZorderCommandBase extends DataWritingCommand {
Seq.empty
}
}
+
+/**
+ * A runnable command for zorder, we delegate to real command to execute
+ */
+case class OptimizeZorderCommand(
+ catalogTable: CatalogTable,
+ query: LogicalPlan)
+ extends OptimizeZorderCommandBase {
+ protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
+ copy(query = newChild)
+ }
+}
diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatementBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatementBase.scala
index f9201ab..a9bb5a5 100644
--- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatementBase.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatementBase.scala
@@ -30,3 +30,14 @@ abstract class OptimizeZorderStatementBase extends UnaryNode {
override def child: LogicalPlan = query
override def output: Seq[Attribute] = child.output
}
+
+/**
+ * A zorder statement that contains we parsed from SQL.
+ * We should convert this plan to certain command at Analyzer.
+ */
+case class OptimizeZorderStatement(
+ tableIdentifier: Seq[String],
+ query: LogicalPlan) extends OptimizeZorderStatementBase {
+ protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+ copy(query = newChild)
+}
diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorderBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorderBase.scala
index 1fee20c..d839c02 100644
--- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorderBase.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorderBase.scala
@@ -65,3 +65,13 @@ abstract class ResolveZorderBase extends Rule[LogicalPlan] {
case _ => plan
}
}
+
+/**
+ * Resolve `OptimizeZorderStatement` to `OptimizeZorderCommand`
+ */
+case class ResolveZorder(session: SparkSession) extends ResolveZorderBase {
+ override def buildOptimizeZorderCommand(
+ catalogTable: CatalogTable, query: LogicalPlan): OptimizeZorderCommandBase = {
+ OptimizeZorderCommand(catalogTable, query)
+ }
+}
diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBase.scala
index 6069e0b..29e7d5b 100644
--- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBase.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBase.scala
@@ -87,3 +87,8 @@ abstract class ZorderBase extends Expression {
isNull = FalseLiteral)
}
}
+
+case class Zorder(children: Seq[Expression]) extends ZorderBase {
+ protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
+ copy(children = newChildren)
+}
diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParserBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParserBase.scala
index a235f04..de6457e 100644
--- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParserBase.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParserBase.scala
@@ -104,6 +104,12 @@ abstract class ZorderSparkSqlExtensionsParserBase extends ParserInterface {
}
}
+class ZorderSparkSqlExtensionsParser(
+ override val delegate: ParserInterface)
+ extends ZorderSparkSqlExtensionsParserBase {
+ def astBuilder: ZorderSqlAstBuilderBase = new ZorderSqlAstBuilder
+}
+
/* Copied from Apache Spark's to avoid dependency on Spark Internals */
class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
override def consume(): Unit = wrapped.consume
diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilderBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilderBase.scala
index 2467430..33ed18a 100644
--- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilderBase.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilderBase.scala
@@ -415,3 +415,14 @@ abstract class ZorderSqlAstBuilderBase extends ZorderSqlExtensionsBaseVisitor[An
}
}
}
+
+class ZorderSqlAstBuilder extends ZorderSqlAstBuilderBase {
+ override def buildZorder(child: Seq[Expression]): ZorderBase = {
+ Zorder(child)
+ }
+
+ override def buildOptimizeZorderStatement(
+ tableIdentifier: Seq[String], query: LogicalPlan): OptimizeZorderStatementBase = {
+ OptimizeZorderStatement(tableIdentifier, query)
+ }
+}
diff --git a/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala b/dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
similarity index 91%
rename from dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
rename to dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
index 7abeee4..8ffa4bd 100644
--- a/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
+++ b/dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
@@ -17,8 +17,9 @@
package org.apache.spark.sql
+import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.kyuubi.sql.KyuubiSQLConf
@@ -28,6 +29,12 @@ class InsertShuffleNodeBeforeJoinSuite extends KyuubiSparkSQLExtensionTest {
setupData()
}
+ override def sparkConf(): SparkConf = {
+ super.sparkConf()
+ .set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
+ "org.apache.kyuubi.sql.KyuubiSparkSQLCommonExtension")
+ }
+
test("force shuffle before join") {
def checkShuffleNodeNum(sqlString: String, num: Int): Unit = {
var expectedResult: Seq[Row] = Seq.empty
diff --git a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/ZorderSuite.scala b/dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
similarity index 97%
rename from dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
rename to dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
index 9d8a842..490e252 100644
--- a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
+++ b/dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
@@ -25,13 +25,18 @@ import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectComma
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types._
import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
import org.apache.kyuubi.sql.zorder.{OptimizeZorderCommandBase, Zorder}
trait ZorderSuite extends KyuubiSparkSQLExtensionTest with ExpressionEvalHelper {
+ override def sparkConf(): SparkConf = {
+ super.sparkConf()
+ .set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
+ "org.apache.kyuubi.sql.KyuubiSparkSQLCommonExtension")
+ }
test("optimize unpartitioned table") {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
@@ -309,7 +314,10 @@ trait ZorderSuite extends KyuubiSparkSQLExtensionTest with ExpressionEvalHelper
val plan = Project(Seq(Alias(zorder, "c")()), OneRowRelation())
spark.sessionState.analyzer.checkAnalysis(plan)
}.getMessage
- assert(msg.contains("Unsupported z-order type: null"))
+ // before Spark 3.2.0 the null type catalog string is null, after Spark 3.2.0 it's void
+ // see https://github.com/apache/spark/pull/33437
+ assert(msg.contains("Unsupported z-order type:") &&
+ (msg.contains("null") || msg.contains("void")))
}
checkZorderPlan(Zorder(Seq(Literal(null, NullType))))