You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/09/16 14:43:08 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #1105] Zorder
codegen support
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 5602c29 [KYUUBI #1105] Zorder codegen support
5602c29 is described below
commit 5602c29b3694b3d2fdec6ab8334687180d9135d9
Author: Fu Chen <cf...@gmail.com>
AuthorDate: Thu Sep 16 22:42:52 2021 +0800
[KYUUBI #1105] Zorder codegen support
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
1. zorder codegen support
2. move all of the zorder tests to a separate file.
3. add more zorder unit tests.
generate.java
```java
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */ return new SpecificUnsafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 006 */
/* 007 */ private Object[] references;
/* 008 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 009 */
/* 010 */ public SpecificUnsafeProjection(Object[] references) {
/* 011 */ this.references = references;
/* 012 */ mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32);
/* 013 */
/* 014 */ }
/* 015 */
/* 016 */ public void initialize(int partitionIndex) {
/* 017 */
/* 018 */ }
/* 019 */
/* 020 */ // Scala.Function1 need this
/* 021 */ public java.lang.Object apply(java.lang.Object row) {
/* 022 */ return apply((InternalRow) row);
/* 023 */ }
/* 024 */
/* 025 */ public UnsafeRow apply(InternalRow i) {
/* 026 */ mutableStateArray_0[0].reset();
/* 027 */
/* 028 */
/* 029 */
/* 030 */
/* 031 */ byte[] value_0 = null;
/* 032 */ byte[][] binaryArray_0 = new byte[2][];
/* 033 */
/* 034 */ boolean isNull_1 = i.isNullAt(0);
/* 035 */ long value_1 = isNull_1 ?
/* 036 */ -1L : (i.getLong(0));
/* 037 */ if (isNull_1) {
/* 038 */ binaryArray_0[0] = (byte[]) ((byte[][]) references[0] /* defaultValues */)[0];
/* 039 */ } else {
/* 040 */ binaryArray_0[0] = org.apache.kyuubi.sql.zorder.ZorderBytesUtils.toByte(value_1);
/* 041 */ }
/* 042 */
/* 043 */
/* 044 */ boolean isNull_2 = i.isNullAt(1);
/* 045 */ long value_2 = isNull_2 ?
/* 046 */ -1L : (i.getLong(1));
/* 047 */ if (isNull_2) {
/* 048 */ binaryArray_0[1] = (byte[]) ((byte[][]) references[0] /* defaultValues */)[1];
/* 049 */ } else {
/* 050 */ binaryArray_0[1] = org.apache.kyuubi.sql.zorder.ZorderBytesUtils.toByte(value_2);
/* 051 */ }
/* 052 */
/* 053 */ value_0 = org.apache.kyuubi.sql.zorder.ZorderBytesUtils.interleaveMultiByteArray(binaryArray_0);
/* 054 */ mutableStateArray_0[0].write(0, value_0);
/* 055 */ return (mutableStateArray_0[0].getRow());
/* 056 */ }
/* 057 */
/* 058 */
/* 059 */ }
```
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #1109 from cfmcgrady/zorder-codegen.
Closes #1105
f06da25a [Fu Chen] update dependency
8e04dc4e [Fu Chen] JMap -> Array
da00f86c [Fu Chen] remove Logging trait
266b65b3 [Fu Chen] fix style and revert Expression
e15165f3 [Fu Chen] ev.isNull = FalseLiteral
0d77d9de [Fu Chen] review
0d8467f4 [Fu Chen] fix style
5ede5df9 [Fu Chen] refactor test
56e12172 [Fu Chen] clean up
88573102 [Fu Chen] update
5f3d1c02 [Fu Chen] zorder codegen support
Authored-by: Fu Chen <cf...@gmail.com>
Signed-off-by: ulysses-you <ul...@apache.org>
---
dev/kyuubi-extension-spark-3-1/pom.xml | 6 +
.../org/apache/kyuubi/sql/zorder/Zorder.scala | 56 ++-
.../apache/spark/sql/KyuubiExtensionSuite.scala | 341 +---------------
.../scala/org/apache/spark/sql/ZorderSuite.scala | 453 +++++++++++++++++++++
pom.xml | 8 +
5 files changed, 516 insertions(+), 348 deletions(-)
diff --git a/dev/kyuubi-extension-spark-3-1/pom.xml b/dev/kyuubi-extension-spark-3-1/pom.xml
index 3048ec4..74aa88e 100644
--- a/dev/kyuubi-extension-spark-3-1/pom.xml
+++ b/dev/kyuubi-extension-spark-3-1/pom.xml
@@ -71,6 +71,12 @@
</dependency>
<dependency>
+ <groupId>org.scalatestplus</groupId>
+ <artifactId>scalacheck-1-15_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
index d26cb27..16863ee 100644
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
+++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
@@ -20,12 +20,13 @@ package org.apache.kyuubi.sql.zorder
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, FalseLiteral}
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.types.{BinaryType, DataType}
import org.apache.kyuubi.sql.KyuubiSQLExtensionException
-case class Zorder(children: Seq[Expression]) extends Expression with CodegenFallback {
+case class Zorder(children: Seq[Expression]) extends Expression {
override def foldable: Boolean = children.forall(_.foldable)
override def nullable: Boolean = false
override def dataType: DataType = BinaryType
@@ -42,21 +43,48 @@ case class Zorder(children: Seq[Expression]) extends Expression with CodegenFall
}
@transient
- private lazy val defaultNullValues: Seq[Any] = {
- children.map(child => ZorderBytesUtils.defaultValue(child.dataType))
- }
+ private lazy val defaultNullValues: Array[Array[Byte]] =
+ children.map(_.dataType)
+ .map(ZorderBytesUtils.defaultValue)
+ .map(ZorderBytesUtils.toByte)
+ .toArray
override def eval(input: InternalRow): Any = {
- val evaluated = children.zipWithIndex.map { case (child: Expression, index) =>
- val v = child.eval(input)
- if (v == null) {
- defaultNullValues(index)
- } else {
- v
- }
+ val binaryArr = children.zipWithIndex.map {
+ case (child: Expression, index) =>
+ val v = child.eval(input)
+ if (v == null) {
+ defaultNullValues(index)
+ } else {
+ ZorderBytesUtils.toByte(v)
+ }
}
+ ZorderBytesUtils.interleaveMultiByteArray(binaryArr.toArray)
+ }
- val binaryArr = evaluated.map(ZorderBytesUtils.toByte).toArray
- ZorderBytesUtils.interleaveMultiByteArray(binaryArr)
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ val evals = children.map(_.genCode(ctx))
+ val defaultValues = ctx.addReferenceObj("defaultValues", defaultNullValues)
+ val binaryArray = ctx.freshName("binaryArray")
+ val util = ZorderBytesUtils.getClass.getName.stripSuffix("$")
+ val inputs = evals.zipWithIndex.map {
+ case (eval, index) =>
+ s"""
+ |${eval.code}
+ |if (${eval.isNull}) {
+ | $binaryArray[$index] = (byte[]) $defaultValues[$index];
+ |} else {
+ | $binaryArray[$index] = $util.toByte(${eval.value});
+ |}
+ |""".stripMargin
+ }
+ ev.copy(code =
+ code"""
+ |byte[] ${ev.value} = null;
+ |byte[][] $binaryArray = new byte[${evals.length}][];
+ |${inputs.mkString("\n")}
+ |${ev.value} = $util.interleaveMultiByteArray($binaryArray);
+ |""".stripMargin,
+ isNull = FalseLiteral)
}
}
diff --git a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
index b5bddd1..00d47ca 100644
--- a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
+++ b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
@@ -19,22 +19,19 @@ package org.apache.spark.sql
import scala.collection.mutable.Set
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, Literal, Multiply}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project, RepartitionByExpression, Sort}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Multiply}
+import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, CustomShuffleReaderExec, QueryStageExec}
-import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
-import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike}
import org.apache.spark.sql.hive.HiveUtils
-import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
+import org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types.{BooleanType, ByteType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NullType, ShortType, StringType, TimestampType}
+import org.apache.spark.util.Utils
-import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf, KyuubiSQLExtensionException}
+import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf}
import org.apache.kyuubi.sql.watchdog.MaxHivePartitionExceedException
-import org.apache.kyuubi.sql.zorder.Zorder
class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSparkPlanHelper {
@@ -62,6 +59,8 @@ class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSpar
if (_spark != null) {
_spark.stop()
}
+ Utils.deleteRecursively(new java.io.File("spark-warehouse"))
+ Utils.deleteRecursively(new java.io.File("metastore_db"))
}
private def setupData(): Unit = {
@@ -921,143 +920,6 @@ class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSpar
// scalastyle:on println
}
- test("optimize unpartitioned table") {
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
- withTable("up") {
- sql(s"DROP TABLE IF EXISTS up")
-
- val target = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
- Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
- Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
- Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
- sql(s"CREATE TABLE up (c1 INT, c2 INT, c3 INT)")
- sql(s"INSERT INTO TABLE up VALUES" +
- "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
- "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
- "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
- "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-
- val e = intercept[KyuubiSQLExtensionException] {
- sql("OPTIMIZE up WHERE c1 > 1 ZORDER BY c1, c2")
- }
- assert(e.getMessage == "Filters are only supported for partitioned table")
-
- sql("OPTIMIZE up ZORDER BY c1, c2")
- val res = sql("SELECT c1, c2 FROM up").collect()
-
- assert(res.length == 16)
-
- for (i <- target.indices) {
- val t = target(i)
- val r = res(i)
- assert(t(0) == r.getInt(0))
- assert(t(1) == r.getInt(1))
- }
- }
- }
- }
-
- test("optimize partitioned table") {
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
- withTable("p") {
- sql("DROP TABLE IF EXISTS p")
-
- val target = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
- Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
- Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
- Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
-
- sql(s"CREATE TABLE p (c1 INT, c2 INT, c3 INT) PARTITIONED BY (id INT)")
- sql(s"ALTER TABLE p ADD PARTITION (id = 1)")
- sql(s"ALTER TABLE p ADD PARTITION (id = 2)")
- sql(s"INSERT INTO TABLE p PARTITION (id = 1) VALUES" +
- "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
- "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
- "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
- "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
- sql(s"INSERT INTO TABLE p PARTITION (id = 2) VALUES" +
- "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
- "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
- "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
- "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-
- sql(s"OPTIMIZE p ZORDER BY c1, c2")
-
- val res1 = sql(s"SELECT c1, c2 FROM p WHERE id = 1").collect()
- val res2 = sql(s"SELECT c1, c2 FROM p WHERE id = 2").collect()
-
- assert(res1.length == 16)
- assert(res2.length == 16)
-
- for (i <- target.indices) {
- val t = target(i)
- val r1 = res1(i)
- assert(t(0) == r1.getInt(0))
- assert(t(1) == r1.getInt(1))
-
- val r2 = res2(i)
- assert(t(0) == r2.getInt(0))
- assert(t(1) == r2.getInt(1))
- }
- }
- }
- }
-
- test("optimize partitioned table with filters") {
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
- withTable("p") {
- sql("DROP TABLE IF EXISTS p")
-
- val target1 = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
- Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
- Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
- Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
- val target2 = Seq(Seq(0, 0), Seq(0, 1), Seq(0, 2), Seq(0, 3),
- Seq(1, 0), Seq(1, 1), Seq(1, 2), Seq(1, 3),
- Seq(2, 0), Seq(2, 1), Seq(2, 2), Seq(2, 3),
- Seq(3, 0), Seq(3, 1), Seq(3, 2), Seq(3, 3))
- sql(s"CREATE TABLE p (c1 INT, c2 INT, c3 INT) PARTITIONED BY (id INT)")
- sql(s"ALTER TABLE p ADD PARTITION (id = 1)")
- sql(s"ALTER TABLE p ADD PARTITION (id = 2)")
- sql(s"INSERT INTO TABLE p PARTITION (id = 1) VALUES" +
- "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
- "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
- "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
- "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
- sql(s"INSERT INTO TABLE p PARTITION (id = 2) VALUES" +
- "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
- "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
- "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
- "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-
- val e = intercept[KyuubiSQLExtensionException](
- sql(s"OPTIMIZE p WHERE id = 1 AND c1 > 1 ZORDER BY c1, c2")
- )
- assert(e.getMessage == "Only partition column filters are allowed")
-
- sql(s"OPTIMIZE p WHERE id = 1 ZORDER BY c1, c2")
-
- val res1 = sql(s"SELECT c1, c2 FROM p WHERE id = 1").collect()
- val res2 = sql(s"SELECT c1, c2 FROM p WHERE id = 2").collect()
-
- assert(res1.length == 16)
- assert(res2.length == 16)
-
- for (i <- target1.indices) {
- val t1 = target1(i)
- val r1 = res1(i)
- assert(t1(0) == r1.getInt(0))
- assert(t1(1) == r1.getInt(1))
-
- val t2 = target2(i)
- val r2 = res2(i)
- assert(t2(0) == r2.getInt(0))
- assert(t2(1) == r2.getInt(1))
- }
- }
- }
- }
-
// TODO: #1064
// TODO: The matching rule for sql classification should be generated automatically not manually
test("get simple name for auxiliary statement") {
@@ -1457,195 +1319,6 @@ class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSpar
println("auxiliary statement simple name is :" + auxiStatementSimpleName.toSeq.sorted)
// scalastyle:on println
}
-
- test("optimize zorder with datasource table") {
- // TODO remove this if we support datasource table
- withTable("t") {
- sql("CREATE TABLE t (c1 int, c2 int) USING PARQUET")
- val msg = intercept[KyuubiSQLExtensionException] {
- sql("OPTIMIZE t ZORDER BY c1, c2")
- }.getMessage
- assert(msg.contains("only support hive table"))
- }
- }
-
- private def checkZorderTable(
- enabled: Boolean,
- cols: String,
- planHasRepartition: Boolean,
- resHasSort: Boolean): Unit = {
- def checkSort(plan: LogicalPlan): Unit = {
- assert(plan.isInstanceOf[Sort] === resHasSort)
- if (plan.isInstanceOf[Sort]) {
- val refs = plan.asInstanceOf[Sort].order.head
- .child.asInstanceOf[Zorder].children.map(_.references.head)
- val colArr = cols.split(",")
- assert(refs.size === colArr.size)
- refs.zip(colArr).foreach { case (ref, col) =>
- assert(ref.name === col.trim)
- }
- }
- }
-
- val repartition = if (planHasRepartition) {
- "/*+ repartition */"
- } else {
- ""
- }
- withSQLConf("spark.sql.shuffle.partitions" -> "1") {
- // hive
- withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
- withTable("zorder_t1", "zorder_t2_true", "zorder_t2_false") {
- sql(
- s"""
- |CREATE TABLE zorder_t1 (c1 int, c2 string, c3 long, c4 double) STORED AS PARQUET
- |TBLPROPERTIES (
- | 'kyuubi.zorder.enabled' = '$enabled',
- | 'kyuubi.zorder.cols' = '$cols')
- |""".stripMargin)
- val df1 = sql(s"""
- |INSERT INTO TABLE zorder_t1
- |SELECT $repartition * FROM VALUES(1,'a',2,4D),(2,'b',3,6D)
- |""".stripMargin)
- assert(df1.queryExecution.analyzed.isInstanceOf[InsertIntoHiveTable])
- checkSort(df1.queryExecution.analyzed.children.head)
-
- Seq("true", "false").foreach { optimized =>
- withSQLConf("spark.sql.hive.convertMetastoreCtas" -> optimized,
- "spark.sql.hive.convertMetastoreParquet" -> optimized) {
- val df2 =
- sql(
- s"""
- |CREATE TABLE zorder_t2_$optimized STORED AS PARQUET
- |TBLPROPERTIES (
- | 'kyuubi.zorder.enabled' = '$enabled',
- | 'kyuubi.zorder.cols' = '$cols')
- |
- |SELECT $repartition * FROM
- |VALUES(1,'a',2,4D),(2,'b',3,6D) AS t(c1 ,c2 , c3, c4)
- |""".stripMargin)
- if (optimized.toBoolean) {
- assert(df2.queryExecution.analyzed
- .isInstanceOf[OptimizedCreateHiveTableAsSelectCommand])
- } else {
- assert(df2.queryExecution.analyzed.isInstanceOf[CreateHiveTableAsSelectCommand])
- }
- checkSort(df2.queryExecution.analyzed.children.head)
- }
- }
- }
- }
-
- // datasource
- withTable("zorder_t3", "zorder_t4") {
- sql(
- s"""
- |CREATE TABLE zorder_t3 (c1 int, c2 string, c3 long, c4 double) USING PARQUET
- |TBLPROPERTIES (
- | 'kyuubi.zorder.enabled' = '$enabled',
- | 'kyuubi.zorder.cols' = '$cols')
- |""".stripMargin)
- val df1 = sql(s"""
- |INSERT INTO TABLE zorder_t3
- |SELECT $repartition * FROM VALUES(1,'a',2,4D),(2,'b',3,6D)
- |""".stripMargin)
- assert(df1.queryExecution.analyzed.isInstanceOf[InsertIntoHadoopFsRelationCommand])
- checkSort(df1.queryExecution.analyzed.children.head)
-
- val df2 =
- sql(
- s"""
- |CREATE TABLE zorder_t4 USING PARQUET
- |TBLPROPERTIES (
- | 'kyuubi.zorder.enabled' = '$enabled',
- | 'kyuubi.zorder.cols' = '$cols')
- |
- |SELECT $repartition * FROM
- |VALUES(1,'a',2,4D),(2,'b',3,6D) AS t(c1 ,c2 , c3, c4)
- |""".stripMargin)
- assert(df2.queryExecution.analyzed.isInstanceOf[CreateDataSourceTableAsSelectCommand])
- checkSort(df2.queryExecution.analyzed.children.head)
- }
- }
- }
-
- test("Support insert zorder by table properties") {
- withSQLConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING.key -> "false") {
- checkZorderTable(true, "c1", false, false)
- checkZorderTable(false, "c1", false, false)
- }
- withSQLConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING.key -> "true") {
- checkZorderTable(true, "", false, false)
- checkZorderTable(true, "c5", false, false)
- checkZorderTable(true, "c1,c5", false, false)
- checkZorderTable(false, "c3", false, false)
- checkZorderTable(true, "c3", true, false)
- checkZorderTable(true, "c3", false, true)
- checkZorderTable(true, "c2,c4", false, true)
- checkZorderTable(true, "c4, c2, c1, c3", false, true)
- }
- }
-
- test("zorder: check unsupported data type") {
- def checkZorderPlan(zorder: Expression): Unit = {
- val msg = intercept[AnalysisException] {
- val plan = Project(Seq(Alias(zorder, "c")()), OneRowRelation())
- spark.sessionState.analyzer.checkAnalysis(plan)
- }.getMessage
- assert(msg.contains("Unsupported z-order type: null"))
- }
-
- checkZorderPlan(Zorder(Seq(Literal(null, NullType))))
- checkZorderPlan(Zorder(Seq(Literal(1, IntegerType), Literal(null, NullType))))
- }
-
- test("zorder: check supported data type") {
- val children = Seq(
- Literal.create(false, BooleanType),
- Literal.create(null, BooleanType),
- Literal.create(1.toByte, ByteType),
- Literal.create(null, ByteType),
- Literal.create(1.toShort, ShortType),
- Literal.create(null, ShortType),
- Literal.create(1, IntegerType),
- Literal.create(null, IntegerType),
- Literal.create(1L, LongType),
- Literal.create(null, LongType),
- Literal.create(1f, FloatType),
- Literal.create(null, FloatType),
- Literal.create(1d, DoubleType),
- Literal.create(null, DoubleType),
- Literal.create("1", StringType),
- Literal.create(null, StringType),
- Literal.create(1L, TimestampType),
- Literal.create(null, TimestampType),
- Literal.create(1, DateType),
- Literal.create(null, DateType),
- Literal.create(BigDecimal(1, 1), DecimalType(1, 1)),
- Literal.create(null, DecimalType(1, 1))
- )
- val zorder = Zorder(children)
- val plan = Project(Seq(Alias(zorder, "c")()), OneRowRelation())
- spark.sessionState.analyzer.checkAnalysis(plan)
- assert(zorder.foldable)
- val res = zorder.eval().asInstanceOf[Array[Byte]]
- val expected = Array(
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
- 0x50, 0x54, 0x15, 0x05, 0x49, 0x51, 0x54, 0x55, 0x15, 0x45,
- 0x51, 0x54, 0x55, 0x15, 0x45, 0x51, 0x54, 0x55, 0x15, 0x45,
- 0x51, 0x54, 0x55, 0x15, 0x45, 0x51, 0x54, 0x55, 0x15, 0x45,
- 0x44, 0x44, 0x22, 0x22, 0x08, 0x88, 0x82, 0x22, 0x20, 0x88,
- 0x88, 0x22, 0x22, 0x08, 0x88, 0x82, 0x22, 0x20, 0xa8, 0x9a,
- 0x2a, 0x2a, 0x8a, 0x8a, 0xa2, 0xa2, 0xa8, 0xa8, 0xaa, 0x2a,
- 0x2a, 0x8a, 0x8a, 0xa2, 0xa2, 0xa8, 0xa8, 0xaa, 0xca, 0x8a,
- 0xa8, 0xa8, 0xaa, 0x8a, 0x8a, 0xa8, 0xa8, 0xaa, 0x8a, 0x8a,
- 0xa8, 0xa8, 0xaa, 0x8a, 0x8a, 0xa8, 0xa8, 0xaa, 0xb2, 0xa2,
- 0xaa, 0x8a, 0x9a, 0xaa, 0x2a, 0x6a, 0xa8, 0xa8, 0xaa, 0xa2,
- 0xa2, 0xaa, 0x8a, 0x8a, 0xaa, 0x2f, 0x6b, 0xfc)
- .map(_.toByte)
- assert(java.util.Arrays.equals(res, expected))
- }
-
test("test watchdog with scan maxHivePartitions") {
withTable("test", "temp") {
sql(
diff --git a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/ZorderSuite.scala b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
new file mode 100644
index 0000000..f7e7215
--- /dev/null
+++ b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Expression, ExpressionEvalHelper, Literal, NullsLast, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project, Sort}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
+import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
+import org.apache.kyuubi.sql.zorder.Zorder
+
+trait ZorderSuite extends QueryTest
+ with SQLTestUtils
+ with AdaptiveSparkPlanHelper
+ with ExpressionEvalHelper {
+
+ var _spark: SparkSession = _
+ override def spark: SparkSession = _spark
+
+ protected override def beforeAll(): Unit = {
+ _spark = SparkSession.builder()
+ .master("local[1]")
+ .config(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
+ "org.apache.kyuubi.sql.KyuubiSparkSQLExtension")
+ .config(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
+ .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")
+ .config("spark.hadoop.hive.metastore.client.capability.check", "false")
+ .config("spark.ui.enabled", "false")
+ .config(sparkConf)
+ .enableHiveSupport()
+ .getOrCreate()
+ super.beforeAll()
+ }
+
+ protected override def afterAll(): Unit = {
+ super.afterAll()
+ if (_spark != null) {
+ _spark.stop()
+ }
+ Utils.deleteRecursively(new java.io.File("spark-warehouse"))
+ Utils.deleteRecursively(new java.io.File("metastore_db"))
+ }
+
+ test("optimize unpartitioned table") {
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+ withTable("up") {
+ sql(s"DROP TABLE IF EXISTS up")
+
+ val target = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
+ Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
+ Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
+ Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
+ sql(s"CREATE TABLE up (c1 INT, c2 INT, c3 INT)")
+ sql(s"INSERT INTO TABLE up VALUES" +
+ "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
+ "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
+ "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
+ "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
+
+ val e = intercept[KyuubiSQLExtensionException] {
+ sql("OPTIMIZE up WHERE c1 > 1 ZORDER BY c1, c2")
+ }
+ assert(e.getMessage == "Filters are only supported for partitioned table")
+
+ sql("OPTIMIZE up ZORDER BY c1, c2")
+ val res = sql("SELECT c1, c2 FROM up").collect()
+
+ assert(res.length == 16)
+
+ for (i <- target.indices) {
+ val t = target(i)
+ val r = res(i)
+ assert(t(0) == r.getInt(0))
+ assert(t(1) == r.getInt(1))
+ }
+ }
+ }
+ }
+
+ test("optimize partitioned table") {
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+ withTable("p") {
+ sql("DROP TABLE IF EXISTS p")
+
+ val target = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
+ Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
+ Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
+ Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
+
+ sql(s"CREATE TABLE p (c1 INT, c2 INT, c3 INT) PARTITIONED BY (id INT)")
+ sql(s"ALTER TABLE p ADD PARTITION (id = 1)")
+ sql(s"ALTER TABLE p ADD PARTITION (id = 2)")
+ sql(s"INSERT INTO TABLE p PARTITION (id = 1) VALUES" +
+ "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
+ "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
+ "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
+ "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
+ sql(s"INSERT INTO TABLE p PARTITION (id = 2) VALUES" +
+ "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
+ "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
+ "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
+ "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
+
+ sql(s"OPTIMIZE p ZORDER BY c1, c2")
+
+ val res1 = sql(s"SELECT c1, c2 FROM p WHERE id = 1").collect()
+ val res2 = sql(s"SELECT c1, c2 FROM p WHERE id = 2").collect()
+
+ assert(res1.length == 16)
+ assert(res2.length == 16)
+
+ for (i <- target.indices) {
+ val t = target(i)
+ val r1 = res1(i)
+ assert(t(0) == r1.getInt(0))
+ assert(t(1) == r1.getInt(1))
+
+ val r2 = res2(i)
+ assert(t(0) == r2.getInt(0))
+ assert(t(1) == r2.getInt(1))
+ }
+ }
+ }
+ }
+
+ test("optimize partitioned table with filters") {
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+ withTable("p") {
+ sql("DROP TABLE IF EXISTS p")
+
+ val target1 = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
+ Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
+ Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
+ Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
+ val target2 = Seq(Seq(0, 0), Seq(0, 1), Seq(0, 2), Seq(0, 3),
+ Seq(1, 0), Seq(1, 1), Seq(1, 2), Seq(1, 3),
+ Seq(2, 0), Seq(2, 1), Seq(2, 2), Seq(2, 3),
+ Seq(3, 0), Seq(3, 1), Seq(3, 2), Seq(3, 3))
+ sql(s"CREATE TABLE p (c1 INT, c2 INT, c3 INT) PARTITIONED BY (id INT)")
+ sql(s"ALTER TABLE p ADD PARTITION (id = 1)")
+ sql(s"ALTER TABLE p ADD PARTITION (id = 2)")
+ sql(s"INSERT INTO TABLE p PARTITION (id = 1) VALUES" +
+ "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
+ "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
+ "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
+ "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
+ sql(s"INSERT INTO TABLE p PARTITION (id = 2) VALUES" +
+ "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
+ "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
+ "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
+ "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
+
+ val e = intercept[KyuubiSQLExtensionException](
+ sql(s"OPTIMIZE p WHERE id = 1 AND c1 > 1 ZORDER BY c1, c2")
+ )
+ assert(e.getMessage == "Only partition column filters are allowed")
+
+ sql(s"OPTIMIZE p WHERE id = 1 ZORDER BY c1, c2")
+
+ val res1 = sql(s"SELECT c1, c2 FROM p WHERE id = 1").collect()
+ val res2 = sql(s"SELECT c1, c2 FROM p WHERE id = 2").collect()
+
+ assert(res1.length == 16)
+ assert(res2.length == 16)
+
+ for (i <- target1.indices) {
+ val t1 = target1(i)
+ val r1 = res1(i)
+ assert(t1(0) == r1.getInt(0))
+ assert(t1(1) == r1.getInt(1))
+
+ val t2 = target2(i)
+ val r2 = res2(i)
+ assert(t2(0) == r2.getInt(0))
+ assert(t2(1) == r2.getInt(1))
+ }
+ }
+ }
+ }
+
+ test("optimize zorder with datasource table") {
+ // TODO remove this if we support datasource table
+ withTable("t") {
+ sql("CREATE TABLE t (c1 int, c2 int) USING PARQUET")
+ val msg = intercept[KyuubiSQLExtensionException] {
+ sql("OPTIMIZE t ZORDER BY c1, c2")
+ }.getMessage
+ assert(msg.contains("only support hive table"))
+ }
+ }
+
+ private def checkZorderTable(
+ enabled: Boolean,
+ cols: String,
+ planHasRepartition: Boolean,
+ resHasSort: Boolean): Unit = {
+ def checkSort(plan: LogicalPlan): Unit = {
+ assert(plan.isInstanceOf[Sort] === resHasSort)
+ if (plan.isInstanceOf[Sort]) {
+ val refs = plan.asInstanceOf[Sort].order.head
+ .child.asInstanceOf[Zorder].children.map(_.references.head)
+ val colArr = cols.split(",")
+ assert(refs.size === colArr.size)
+ refs.zip(colArr).foreach { case (ref, col) =>
+ assert(ref.name === col.trim)
+ }
+ }
+ }
+
+ val repartition = if (planHasRepartition) {
+ "/*+ repartition */"
+ } else {
+ ""
+ }
+ withSQLConf("spark.sql.shuffle.partitions" -> "1") {
+ // hive
+ withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
+ withTable("zorder_t1", "zorder_t2_true", "zorder_t2_false") {
+ sql(
+ s"""
+ |CREATE TABLE zorder_t1 (c1 int, c2 string, c3 long, c4 double) STORED AS PARQUET
+ |TBLPROPERTIES (
+ | 'kyuubi.zorder.enabled' = '$enabled',
+ | 'kyuubi.zorder.cols' = '$cols')
+ |""".stripMargin)
+ val df1 = sql(s"""
+ |INSERT INTO TABLE zorder_t1
+ |SELECT $repartition * FROM VALUES(1,'a',2,4D),(2,'b',3,6D)
+ |""".stripMargin)
+ assert(df1.queryExecution.analyzed.isInstanceOf[InsertIntoHiveTable])
+ checkSort(df1.queryExecution.analyzed.children.head)
+
+ Seq("true", "false").foreach { optimized =>
+ withSQLConf("spark.sql.hive.convertMetastoreCtas" -> optimized,
+ "spark.sql.hive.convertMetastoreParquet" -> optimized) {
+ val df2 =
+ sql(
+ s"""
+ |CREATE TABLE zorder_t2_$optimized STORED AS PARQUET
+ |TBLPROPERTIES (
+ | 'kyuubi.zorder.enabled' = '$enabled',
+ | 'kyuubi.zorder.cols' = '$cols')
+ |
+ |SELECT $repartition * FROM
+ |VALUES(1,'a',2,4D),(2,'b',3,6D) AS t(c1 ,c2 , c3, c4)
+ |""".stripMargin)
+ if (optimized.toBoolean) {
+ assert(df2.queryExecution.analyzed
+ .isInstanceOf[OptimizedCreateHiveTableAsSelectCommand])
+ } else {
+ assert(df2.queryExecution.analyzed.isInstanceOf[CreateHiveTableAsSelectCommand])
+ }
+ checkSort(df2.queryExecution.analyzed.children.head)
+ }
+ }
+ }
+ }
+
+ // datasource
+ withTable("zorder_t3", "zorder_t4") {
+ sql(
+ s"""
+ |CREATE TABLE zorder_t3 (c1 int, c2 string, c3 long, c4 double) USING PARQUET
+ |TBLPROPERTIES (
+ | 'kyuubi.zorder.enabled' = '$enabled',
+ | 'kyuubi.zorder.cols' = '$cols')
+ |""".stripMargin)
+ val df1 = sql(s"""
+ |INSERT INTO TABLE zorder_t3
+ |SELECT $repartition * FROM VALUES(1,'a',2,4D),(2,'b',3,6D)
+ |""".stripMargin)
+ assert(df1.queryExecution.analyzed.isInstanceOf[InsertIntoHadoopFsRelationCommand])
+ checkSort(df1.queryExecution.analyzed.children.head)
+
+ val df2 =
+ sql(
+ s"""
+ |CREATE TABLE zorder_t4 USING PARQUET
+ |TBLPROPERTIES (
+ | 'kyuubi.zorder.enabled' = '$enabled',
+ | 'kyuubi.zorder.cols' = '$cols')
+ |
+ |SELECT $repartition * FROM
+ |VALUES(1,'a',2,4D),(2,'b',3,6D) AS t(c1 ,c2 , c3, c4)
+ |""".stripMargin)
+ assert(df2.queryExecution.analyzed.isInstanceOf[CreateDataSourceTableAsSelectCommand])
+ checkSort(df2.queryExecution.analyzed.children.head)
+ }
+ }
+ }
+
+ test("Support insert zorder by table properties") {
+ withSQLConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING.key -> "false") {
+ checkZorderTable(true, "c1", false, false)
+ checkZorderTable(false, "c1", false, false)
+ }
+ withSQLConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING.key -> "true") {
+ checkZorderTable(true, "", false, false)
+ checkZorderTable(true, "c5", false, false)
+ checkZorderTable(true, "c1,c5", false, false)
+ checkZorderTable(false, "c3", false, false)
+ checkZorderTable(true, "c3", true, false)
+ checkZorderTable(true, "c3", false, true)
+ checkZorderTable(true, "c2,c4", false, true)
+ checkZorderTable(true, "c4, c2, c1, c3", false, true)
+ }
+ }
+
+ test("zorder: check unsupported data type") {
+ def checkZorderPlan(zorder: Expression): Unit = {
+ val msg = intercept[AnalysisException] {
+ val plan = Project(Seq(Alias(zorder, "c")()), OneRowRelation())
+ spark.sessionState.analyzer.checkAnalysis(plan)
+ }.getMessage
+ assert(msg.contains("Unsupported z-order type: null"))
+ }
+
+ checkZorderPlan(Zorder(Seq(Literal(null, NullType))))
+ checkZorderPlan(Zorder(Seq(Literal(1, IntegerType), Literal(null, NullType))))
+ }
+
+ test("zorder: check supported data type") {
+ val children = Seq(
+ Literal.create(false, BooleanType),
+ Literal.create(null, BooleanType),
+ Literal.create(1.toByte, ByteType),
+ Literal.create(null, ByteType),
+ Literal.create(1.toShort, ShortType),
+ Literal.create(null, ShortType),
+ Literal.create(1, IntegerType),
+ Literal.create(null, IntegerType),
+ Literal.create(1L, LongType),
+ Literal.create(null, LongType),
+ Literal.create(1f, FloatType),
+ Literal.create(null, FloatType),
+ Literal.create(1d, DoubleType),
+ Literal.create(null, DoubleType),
+ Literal.create("1", StringType),
+ Literal.create(null, StringType),
+ Literal.create(1L, TimestampType),
+ Literal.create(null, TimestampType),
+ Literal.create(1, DateType),
+ Literal.create(null, DateType),
+ Literal.create(BigDecimal(1, 1), DecimalType(1, 1)),
+ Literal.create(null, DecimalType(1, 1))
+ )
+ val zorder = Zorder(children)
+ val plan = Project(Seq(Alias(zorder, "c")()), OneRowRelation())
+ spark.sessionState.analyzer.checkAnalysis(plan)
+ assert(zorder.foldable)
+ val expected = Array(
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x50, 0x54, 0x15, 0x05, 0x49, 0x51, 0x54, 0x55, 0x15, 0x45,
+ 0x51, 0x54, 0x55, 0x15, 0x45, 0x51, 0x54, 0x55, 0x15, 0x45,
+ 0x51, 0x54, 0x55, 0x15, 0x45, 0x51, 0x54, 0x55, 0x15, 0x45,
+ 0x44, 0x44, 0x22, 0x22, 0x08, 0x88, 0x82, 0x22, 0x20, 0x88,
+ 0x88, 0x22, 0x22, 0x08, 0x88, 0x82, 0x22, 0x20, 0xa8, 0x9a,
+ 0x2a, 0x2a, 0x8a, 0x8a, 0xa2, 0xa2, 0xa8, 0xa8, 0xaa, 0x2a,
+ 0x2a, 0x8a, 0x8a, 0xa2, 0xa2, 0xa8, 0xa8, 0xaa, 0xca, 0x8a,
+ 0xa8, 0xa8, 0xaa, 0x8a, 0x8a, 0xa8, 0xa8, 0xaa, 0x8a, 0x8a,
+ 0xa8, 0xa8, 0xaa, 0x8a, 0x8a, 0xa8, 0xa8, 0xaa, 0xb2, 0xa2,
+ 0xaa, 0x8a, 0x9a, 0xaa, 0x2a, 0x6a, 0xa8, 0xa8, 0xaa, 0xa2,
+ 0xa2, 0xaa, 0x8a, 0x8a, 0xaa, 0x2f, 0x6b, 0xfc)
+ .map(_.toByte)
+ checkEvaluation(zorder, expected, InternalRow.fromSeq(children))
+ }
+
+ test("sort with zorder -- int column") {
+ // TODO: add more datatype unit test
+ def checkSort(input: DataFrame, expected: Seq[Row]): Unit = {
+ withTempDir { dir =>
+ input.repartition(3).write.mode("overwrite").format("json").save(dir.getCanonicalPath)
+ val df = spark.read.format("json")
+ .load(dir.getCanonicalPath)
+ .repartition(1)
+ val exprs = Seq("c1", "c2").map(col).map(_.expr)
+ val sortOrder = SortOrder(Zorder(exprs), Ascending, NullsLast, Seq.empty)
+ val zorderSort = Sort(Seq(sortOrder), true, df.logicalPlan)
+ val result = Dataset.ofRows(spark, zorderSort)
+ checkAnswer(result, expected)
+ }
+ }
+ val session = spark
+ import session.implicits._
+ // generate 4 * 4 matrix
+ val len = 3
+ val input = spark.range(len + 1)
+ .select('id as "c1", explode(sequence(lit(0), lit(len))) as "c2")
+ val expected =
+ Row(0, 0) :: Row(1, 0) :: Row(0, 1) :: Row(1, 1) ::
+ Row(2, 0) :: Row(3, 0) :: Row(2, 1) :: Row(3, 1) ::
+ Row(0, 2) :: Row(1, 2) :: Row(0, 3) :: Row(1, 3) ::
+ Row(2, 2) :: Row(3, 2) :: Row(2, 3) :: Row(3, 3) :: Nil
+ checkSort(input, expected)
+
+ // contains null value case.
+ val nullDF = spark.range(1).select(lit(null) cast LongType).as[java.lang.Long]
+ val input2 = spark.range(len)
+ .union(nullDF)
+ .select(
+ 'id as "c1",
+ explode(concat(sequence(lit(0), lit(len - 1)), array(lit(null)))) as "c2")
+ val expected2 = Row(null, null) :: Row(0, null) :: Row(1, null) :: Row(2, null) ::
+ Row(null, 0) :: Row(null, 1) :: Row(null, 2) :: Row(0, 0) ::
+ Row(1, 0) :: Row(0, 1) :: Row(1, 1) :: Row(2, 0) ::
+ Row(2, 1) :: Row(0, 2) :: Row(1, 2) :: Row(2, 2) :: Nil
+ checkSort(input2, expected2)
+ }
+
+ def sparkConf(): SparkConf
+}
+
+class ZorderWithCodegenEnabledSuite extends ZorderSuite {
+ override def sparkConf(): SparkConf = {
+ val conf = new SparkConf()
+ conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
+ conf
+ }
+}
+
+class ZorderWithCodegenDisabledSuite extends ZorderSuite {
+ override def sparkConf(): SparkConf = {
+ val conf = new SparkConf()
+ conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false")
+ conf.set(SQLConf.CODEGEN_FACTORY_MODE.key, "NO_CODEGEN")
+ conf
+ }
+}
diff --git a/pom.xml b/pom.xml
index 927e138..5c363ec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,6 +115,7 @@
<ldapsdk.version>5.1.4</ldapsdk.version>
<parquet.version>1.10.1</parquet.version>
<prometheus.version>0.10.0</prometheus.version>
+ <scalacheck.version>3.2.9.0</scalacheck.version>
<scalatest.version>3.2.9</scalatest.version>
<scopt.version>4.0.1</scopt.version>
<slf4j.version>1.7.30</slf4j.version>
@@ -406,6 +407,13 @@
</dependency>
<dependency>
+ <groupId>org.scalatestplus</groupId>
+ <artifactId>scalacheck-1-15_${scala.binary.version}</artifactId>
+ <version>${scalacheck.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>