You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/09/16 14:43:08 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1105] Zorder codegen support

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5602c29  [KYUUBI #1105] Zorder codegen support
5602c29 is described below

commit 5602c29b3694b3d2fdec6ab8334687180d9135d9
Author: Fu Chen <cf...@gmail.com>
AuthorDate: Thu Sep 16 22:42:52 2021 +0800

    [KYUUBI #1105] Zorder codegen support
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    1. zorder codegen support
    2. move all of the zorder tests to a separate file.
    3. add more zorder unit tests.
    
    generate.java
    ```java
    /* 001 */ public java.lang.Object generate(Object[] references) {
    /* 002 */   return new SpecificUnsafeProjection(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
    /* 006 */
    /* 007 */   private Object[] references;
    /* 008 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
    /* 009 */
    /* 010 */   public SpecificUnsafeProjection(Object[] references) {
    /* 011 */     this.references = references;
    /* 012 */     mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32);
    /* 013 */
    /* 014 */   }
    /* 015 */
    /* 016 */   public void initialize(int partitionIndex) {
    /* 017 */
    /* 018 */   }
    /* 019 */
    /* 020 */   // Scala.Function1 need this
    /* 021 */   public java.lang.Object apply(java.lang.Object row) {
    /* 022 */     return apply((InternalRow) row);
    /* 023 */   }
    /* 024 */
    /* 025 */   public UnsafeRow apply(InternalRow i) {
    /* 026 */     mutableStateArray_0[0].reset();
    /* 027 */
    /* 028 */
    /* 029 */
    /* 030 */
    /* 031 */     byte[] value_0 = null;
    /* 032 */     byte[][] binaryArray_0 = new byte[2][];
    /* 033 */
    /* 034 */     boolean isNull_1 = i.isNullAt(0);
    /* 035 */     long value_1 = isNull_1 ?
    /* 036 */     -1L : (i.getLong(0));
    /* 037 */     if (isNull_1) {
    /* 038 */       binaryArray_0[0] = (byte[]) ((byte[][]) references[0] /* defaultValues */)[0];
    /* 039 */     } else {
    /* 040 */       binaryArray_0[0] = org.apache.kyuubi.sql.zorder.ZorderBytesUtils.toByte(value_1);
    /* 041 */     }
    /* 042 */
    /* 043 */
    /* 044 */     boolean isNull_2 = i.isNullAt(1);
    /* 045 */     long value_2 = isNull_2 ?
    /* 046 */     -1L : (i.getLong(1));
    /* 047 */     if (isNull_2) {
    /* 048 */       binaryArray_0[1] = (byte[]) ((byte[][]) references[0] /* defaultValues */)[1];
    /* 049 */     } else {
    /* 050 */       binaryArray_0[1] = org.apache.kyuubi.sql.zorder.ZorderBytesUtils.toByte(value_2);
    /* 051 */     }
    /* 052 */
    /* 053 */     value_0 = org.apache.kyuubi.sql.zorder.ZorderBytesUtils.interleaveMultiByteArray(binaryArray_0);
    /* 054 */     mutableStateArray_0[0].write(0, value_0);
    /* 055 */     return (mutableStateArray_0[0].getRow());
    /* 056 */   }
    /* 057 */
    /* 058 */
    /* 059 */ }
    ```
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #1109 from cfmcgrady/zorder-codegen.
    
    Closes #1105
    
    f06da25a [Fu Chen] update dependency
    8e04dc4e [Fu Chen] JMap -> Array
    da00f86c [Fu Chen] remove Logging trait
    266b65b3 [Fu Chen] fix style and revert Expression
    e15165f3 [Fu Chen] ev.isNull = FalseLiteral
    0d77d9de [Fu Chen] review
    0d8467f4 [Fu Chen] fix style
    5ede5df9 [Fu Chen] refactor test
    56e12172 [Fu Chen] clean up
    88573102 [Fu Chen] update
    5f3d1c02 [Fu Chen] zorder codegen support
    
    Authored-by: Fu Chen <cf...@gmail.com>
    Signed-off-by: ulysses-you <ul...@apache.org>
---
 dev/kyuubi-extension-spark-3-1/pom.xml             |   6 +
 .../org/apache/kyuubi/sql/zorder/Zorder.scala      |  56 ++-
 .../apache/spark/sql/KyuubiExtensionSuite.scala    | 341 +---------------
 .../scala/org/apache/spark/sql/ZorderSuite.scala   | 453 +++++++++++++++++++++
 pom.xml                                            |   8 +
 5 files changed, 516 insertions(+), 348 deletions(-)

diff --git a/dev/kyuubi-extension-spark-3-1/pom.xml b/dev/kyuubi-extension-spark-3-1/pom.xml
index 3048ec4..74aa88e 100644
--- a/dev/kyuubi-extension-spark-3-1/pom.xml
+++ b/dev/kyuubi-extension-spark-3-1/pom.xml
@@ -71,6 +71,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.scalatestplus</groupId>
+            <artifactId>scalacheck-1-15_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
index d26cb27..16863ee 100644
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
+++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
@@ -20,12 +20,13 @@ package org.apache.kyuubi.sql.zorder
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, FalseLiteral}
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.types.{BinaryType, DataType}
 
 import org.apache.kyuubi.sql.KyuubiSQLExtensionException
 
-case class Zorder(children: Seq[Expression]) extends Expression with CodegenFallback {
+case class Zorder(children: Seq[Expression]) extends Expression {
   override def foldable: Boolean = children.forall(_.foldable)
   override def nullable: Boolean = false
   override def dataType: DataType = BinaryType
@@ -42,21 +43,48 @@ case class Zorder(children: Seq[Expression]) extends Expression with CodegenFall
   }
 
   @transient
-  private lazy val defaultNullValues: Seq[Any] = {
-    children.map(child => ZorderBytesUtils.defaultValue(child.dataType))
-  }
+  private lazy val defaultNullValues: Array[Array[Byte]] =
+    children.map(_.dataType)
+      .map(ZorderBytesUtils.defaultValue)
+      .map(ZorderBytesUtils.toByte)
+      .toArray
 
   override def eval(input: InternalRow): Any = {
-    val evaluated = children.zipWithIndex.map { case (child: Expression, index) =>
-      val v = child.eval(input)
-      if (v == null) {
-        defaultNullValues(index)
-      } else {
-        v
-      }
+    val binaryArr = children.zipWithIndex.map {
+      case (child: Expression, index) =>
+        val v = child.eval(input)
+        if (v == null) {
+          defaultNullValues(index)
+        } else {
+          ZorderBytesUtils.toByte(v)
+        }
     }
+    ZorderBytesUtils.interleaveMultiByteArray(binaryArr.toArray)
+  }
 
-    val binaryArr = evaluated.map(ZorderBytesUtils.toByte).toArray
-    ZorderBytesUtils.interleaveMultiByteArray(binaryArr)
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+    val evals = children.map(_.genCode(ctx))
+    val defaultValues = ctx.addReferenceObj("defaultValues", defaultNullValues)
+    val binaryArray = ctx.freshName("binaryArray")
+    val util = ZorderBytesUtils.getClass.getName.stripSuffix("$")
+    val inputs = evals.zipWithIndex.map {
+      case (eval, index) =>
+        s"""
+           |${eval.code}
+           |if (${eval.isNull}) {
+           |  $binaryArray[$index] = (byte[]) $defaultValues[$index];
+           |} else {
+           |  $binaryArray[$index] = $util.toByte(${eval.value});
+           |}
+           |""".stripMargin
+    }
+    ev.copy(code =
+      code"""
+         |byte[] ${ev.value} = null;
+         |byte[][] $binaryArray = new byte[${evals.length}][];
+         |${inputs.mkString("\n")}
+         |${ev.value} = $util.interleaveMultiByteArray($binaryArray);
+         |""".stripMargin,
+      isNull = FalseLiteral)
   }
 }
diff --git a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
index b5bddd1..00d47ca 100644
--- a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
+++ b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
@@ -19,22 +19,19 @@ package org.apache.spark.sql
 
 import scala.collection.mutable.Set
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, Literal, Multiply}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project, RepartitionByExpression, Sort}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Multiply}
+import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, CustomShuffleReaderExec, QueryStageExec}
-import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
-import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
 import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike}
 import org.apache.spark.sql.hive.HiveUtils
-import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
+import org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.test.SQLTestData.TestData
 import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types.{BooleanType, ByteType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NullType, ShortType, StringType, TimestampType}
+import org.apache.spark.util.Utils
 
-import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf, KyuubiSQLExtensionException}
+import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf}
 import org.apache.kyuubi.sql.watchdog.MaxHivePartitionExceedException
-import org.apache.kyuubi.sql.zorder.Zorder
 
 class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSparkPlanHelper {
 
@@ -62,6 +59,8 @@ class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSpar
     if (_spark != null) {
       _spark.stop()
     }
+    Utils.deleteRecursively(new java.io.File("spark-warehouse"))
+    Utils.deleteRecursively(new java.io.File("metastore_db"))
   }
 
   private def setupData(): Unit = {
@@ -921,143 +920,6 @@ class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSpar
     // scalastyle:on println
   }
 
-  test("optimize unpartitioned table") {
-    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
-      withTable("up") {
-        sql(s"DROP TABLE IF EXISTS up")
-
-        val target = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
-          Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
-          Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
-          Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
-        sql(s"CREATE TABLE up (c1 INT, c2 INT, c3 INT)")
-        sql(s"INSERT INTO TABLE up VALUES" +
-          "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
-          "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
-          "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
-          "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-
-        val e = intercept[KyuubiSQLExtensionException] {
-          sql("OPTIMIZE up WHERE c1 > 1 ZORDER BY c1, c2")
-        }
-        assert(e.getMessage == "Filters are only supported for partitioned table")
-
-        sql("OPTIMIZE up ZORDER BY c1, c2")
-        val res = sql("SELECT c1, c2 FROM up").collect()
-
-        assert(res.length == 16)
-
-        for (i <- target.indices) {
-          val t = target(i)
-          val r = res(i)
-          assert(t(0) == r.getInt(0))
-          assert(t(1) == r.getInt(1))
-        }
-      }
-    }
-  }
-
-  test("optimize partitioned table") {
-    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
-      withTable("p") {
-        sql("DROP TABLE IF EXISTS p")
-
-        val target = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
-          Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
-          Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
-          Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
-
-        sql(s"CREATE TABLE p (c1 INT, c2 INT, c3 INT) PARTITIONED BY (id INT)")
-        sql(s"ALTER TABLE p ADD PARTITION (id = 1)")
-        sql(s"ALTER TABLE p ADD PARTITION (id = 2)")
-        sql(s"INSERT INTO TABLE p PARTITION (id = 1) VALUES" +
-          "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
-          "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
-          "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
-          "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-        sql(s"INSERT INTO TABLE p PARTITION (id = 2) VALUES" +
-          "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
-          "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
-          "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
-          "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-
-        sql(s"OPTIMIZE p ZORDER BY c1, c2")
-
-        val res1 = sql(s"SELECT c1, c2 FROM p WHERE id = 1").collect()
-        val res2 = sql(s"SELECT c1, c2 FROM p WHERE id = 2").collect()
-
-        assert(res1.length == 16)
-        assert(res2.length == 16)
-
-        for (i <- target.indices) {
-          val t = target(i)
-          val r1 = res1(i)
-          assert(t(0) == r1.getInt(0))
-          assert(t(1) == r1.getInt(1))
-
-          val r2 = res2(i)
-          assert(t(0) == r2.getInt(0))
-          assert(t(1) == r2.getInt(1))
-        }
-      }
-    }
-  }
-
-  test("optimize partitioned table with filters") {
-    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
-      withTable("p") {
-        sql("DROP TABLE IF EXISTS p")
-
-        val target1 = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
-          Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
-          Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
-          Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
-        val target2 = Seq(Seq(0, 0), Seq(0, 1), Seq(0, 2), Seq(0, 3),
-          Seq(1, 0), Seq(1, 1), Seq(1, 2), Seq(1, 3),
-          Seq(2, 0), Seq(2, 1), Seq(2, 2), Seq(2, 3),
-          Seq(3, 0), Seq(3, 1), Seq(3, 2), Seq(3, 3))
-        sql(s"CREATE TABLE p (c1 INT, c2 INT, c3 INT) PARTITIONED BY (id INT)")
-        sql(s"ALTER TABLE p ADD PARTITION (id = 1)")
-        sql(s"ALTER TABLE p ADD PARTITION (id = 2)")
-        sql(s"INSERT INTO TABLE p PARTITION (id = 1) VALUES" +
-          "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
-          "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
-          "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
-          "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-        sql(s"INSERT INTO TABLE p PARTITION (id = 2) VALUES" +
-          "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
-          "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
-          "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
-          "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-
-        val e = intercept[KyuubiSQLExtensionException](
-          sql(s"OPTIMIZE p WHERE id = 1 AND c1 > 1 ZORDER BY c1, c2")
-        )
-        assert(e.getMessage == "Only partition column filters are allowed")
-
-        sql(s"OPTIMIZE p WHERE id = 1 ZORDER BY c1, c2")
-
-        val res1 = sql(s"SELECT c1, c2 FROM p WHERE id = 1").collect()
-        val res2 = sql(s"SELECT c1, c2 FROM p WHERE id = 2").collect()
-
-        assert(res1.length == 16)
-        assert(res2.length == 16)
-
-        for (i <- target1.indices) {
-          val t1 = target1(i)
-          val r1 = res1(i)
-          assert(t1(0) == r1.getInt(0))
-          assert(t1(1) == r1.getInt(1))
-
-          val t2 = target2(i)
-          val r2 = res2(i)
-          assert(t2(0) == r2.getInt(0))
-          assert(t2(1) == r2.getInt(1))
-        }
-      }
-    }
-  }
-
   // TODO: #1064
   // TODO: The matching rule for sql classification should be generated automatically not manually
   test("get simple name for auxiliary statement") {
@@ -1457,195 +1319,6 @@ class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSpar
     println("auxiliary statement simple name is :" + auxiStatementSimpleName.toSeq.sorted)
     // scalastyle:on println
   }
-
-  test("optimize zorder with datasource table") {
-    // TODO remove this if we support datasource table
-    withTable("t") {
-      sql("CREATE TABLE t (c1 int, c2 int) USING PARQUET")
-      val msg = intercept[KyuubiSQLExtensionException] {
-        sql("OPTIMIZE t ZORDER BY c1, c2")
-      }.getMessage
-      assert(msg.contains("only support hive table"))
-    }
-  }
-
-  private def checkZorderTable(
-      enabled: Boolean,
-      cols: String,
-      planHasRepartition: Boolean,
-      resHasSort: Boolean): Unit = {
-    def checkSort(plan: LogicalPlan): Unit = {
-      assert(plan.isInstanceOf[Sort] === resHasSort)
-      if (plan.isInstanceOf[Sort]) {
-        val refs = plan.asInstanceOf[Sort].order.head
-          .child.asInstanceOf[Zorder].children.map(_.references.head)
-        val colArr = cols.split(",")
-        assert(refs.size === colArr.size)
-        refs.zip(colArr).foreach { case (ref, col) =>
-          assert(ref.name === col.trim)
-        }
-      }
-    }
-
-    val repartition = if (planHasRepartition) {
-      "/*+ repartition */"
-    } else {
-      ""
-    }
-    withSQLConf("spark.sql.shuffle.partitions" -> "1") {
-      // hive
-      withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
-        withTable("zorder_t1", "zorder_t2_true", "zorder_t2_false") {
-          sql(
-            s"""
-               |CREATE TABLE zorder_t1 (c1 int, c2 string, c3 long, c4 double) STORED AS PARQUET
-               |TBLPROPERTIES (
-               | 'kyuubi.zorder.enabled' = '$enabled',
-               | 'kyuubi.zorder.cols' = '$cols')
-               |""".stripMargin)
-          val df1 = sql(s"""
-                           |INSERT INTO TABLE zorder_t1
-                           |SELECT $repartition * FROM VALUES(1,'a',2,4D),(2,'b',3,6D)
-                           |""".stripMargin)
-          assert(df1.queryExecution.analyzed.isInstanceOf[InsertIntoHiveTable])
-          checkSort(df1.queryExecution.analyzed.children.head)
-
-          Seq("true", "false").foreach { optimized =>
-            withSQLConf("spark.sql.hive.convertMetastoreCtas" -> optimized,
-              "spark.sql.hive.convertMetastoreParquet" -> optimized) {
-              val df2 =
-                sql(
-                  s"""
-                     |CREATE TABLE zorder_t2_$optimized STORED AS PARQUET
-                     |TBLPROPERTIES (
-                     | 'kyuubi.zorder.enabled' = '$enabled',
-                     | 'kyuubi.zorder.cols' = '$cols')
-                     |
-                     |SELECT $repartition * FROM
-                     |VALUES(1,'a',2,4D),(2,'b',3,6D) AS t(c1 ,c2 , c3, c4)
-                     |""".stripMargin)
-              if (optimized.toBoolean) {
-                assert(df2.queryExecution.analyzed
-                  .isInstanceOf[OptimizedCreateHiveTableAsSelectCommand])
-              } else {
-                assert(df2.queryExecution.analyzed.isInstanceOf[CreateHiveTableAsSelectCommand])
-              }
-              checkSort(df2.queryExecution.analyzed.children.head)
-            }
-          }
-        }
-      }
-
-      // datasource
-      withTable("zorder_t3", "zorder_t4") {
-        sql(
-          s"""
-             |CREATE TABLE zorder_t3 (c1 int, c2 string, c3 long, c4 double) USING PARQUET
-             |TBLPROPERTIES (
-             | 'kyuubi.zorder.enabled' = '$enabled',
-             | 'kyuubi.zorder.cols' = '$cols')
-             |""".stripMargin)
-        val df1 = sql(s"""
-                         |INSERT INTO TABLE zorder_t3
-                         |SELECT $repartition * FROM VALUES(1,'a',2,4D),(2,'b',3,6D)
-                         |""".stripMargin)
-        assert(df1.queryExecution.analyzed.isInstanceOf[InsertIntoHadoopFsRelationCommand])
-        checkSort(df1.queryExecution.analyzed.children.head)
-
-        val df2 =
-          sql(
-            s"""
-               |CREATE TABLE zorder_t4 USING PARQUET
-               |TBLPROPERTIES (
-               | 'kyuubi.zorder.enabled' = '$enabled',
-               | 'kyuubi.zorder.cols' = '$cols')
-               |
-               |SELECT $repartition * FROM
-               |VALUES(1,'a',2,4D),(2,'b',3,6D) AS t(c1 ,c2 , c3, c4)
-               |""".stripMargin)
-        assert(df2.queryExecution.analyzed.isInstanceOf[CreateDataSourceTableAsSelectCommand])
-        checkSort(df2.queryExecution.analyzed.children.head)
-      }
-    }
-  }
-
-  test("Support insert zorder by table properties") {
-    withSQLConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING.key -> "false") {
-      checkZorderTable(true, "c1", false, false)
-      checkZorderTable(false, "c1", false, false)
-    }
-    withSQLConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING.key -> "true") {
-      checkZorderTable(true, "", false, false)
-      checkZorderTable(true, "c5", false, false)
-      checkZorderTable(true, "c1,c5", false, false)
-      checkZorderTable(false, "c3", false, false)
-      checkZorderTable(true, "c3", true, false)
-      checkZorderTable(true, "c3", false, true)
-      checkZorderTable(true, "c2,c4", false, true)
-      checkZorderTable(true, "c4, c2, c1, c3", false, true)
-    }
-  }
-
-  test("zorder: check unsupported data type") {
-    def checkZorderPlan(zorder: Expression): Unit = {
-      val msg = intercept[AnalysisException] {
-        val plan = Project(Seq(Alias(zorder, "c")()), OneRowRelation())
-        spark.sessionState.analyzer.checkAnalysis(plan)
-      }.getMessage
-      assert(msg.contains("Unsupported z-order type: null"))
-    }
-
-    checkZorderPlan(Zorder(Seq(Literal(null, NullType))))
-    checkZorderPlan(Zorder(Seq(Literal(1, IntegerType), Literal(null, NullType))))
-  }
-
-  test("zorder: check supported data type") {
-    val children = Seq(
-      Literal.create(false, BooleanType),
-      Literal.create(null, BooleanType),
-      Literal.create(1.toByte, ByteType),
-      Literal.create(null, ByteType),
-      Literal.create(1.toShort, ShortType),
-      Literal.create(null, ShortType),
-      Literal.create(1, IntegerType),
-      Literal.create(null, IntegerType),
-      Literal.create(1L, LongType),
-      Literal.create(null, LongType),
-      Literal.create(1f, FloatType),
-      Literal.create(null, FloatType),
-      Literal.create(1d, DoubleType),
-      Literal.create(null, DoubleType),
-      Literal.create("1", StringType),
-      Literal.create(null, StringType),
-      Literal.create(1L, TimestampType),
-      Literal.create(null, TimestampType),
-      Literal.create(1, DateType),
-      Literal.create(null, DateType),
-      Literal.create(BigDecimal(1, 1), DecimalType(1, 1)),
-      Literal.create(null, DecimalType(1, 1))
-    )
-    val zorder = Zorder(children)
-    val plan = Project(Seq(Alias(zorder, "c")()), OneRowRelation())
-    spark.sessionState.analyzer.checkAnalysis(plan)
-    assert(zorder.foldable)
-    val res = zorder.eval().asInstanceOf[Array[Byte]]
-    val expected = Array(
-      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-      0x50, 0x54, 0x15, 0x05, 0x49, 0x51, 0x54, 0x55, 0x15, 0x45,
-      0x51, 0x54, 0x55, 0x15, 0x45, 0x51, 0x54, 0x55, 0x15, 0x45,
-      0x51, 0x54, 0x55, 0x15, 0x45, 0x51, 0x54, 0x55, 0x15, 0x45,
-      0x44, 0x44, 0x22, 0x22, 0x08, 0x88, 0x82, 0x22, 0x20, 0x88,
-      0x88, 0x22, 0x22, 0x08, 0x88, 0x82, 0x22, 0x20, 0xa8, 0x9a,
-      0x2a, 0x2a, 0x8a, 0x8a, 0xa2, 0xa2, 0xa8, 0xa8, 0xaa, 0x2a,
-      0x2a, 0x8a, 0x8a, 0xa2, 0xa2, 0xa8, 0xa8, 0xaa, 0xca, 0x8a,
-      0xa8, 0xa8, 0xaa, 0x8a, 0x8a, 0xa8, 0xa8, 0xaa, 0x8a, 0x8a,
-      0xa8, 0xa8, 0xaa, 0x8a, 0x8a, 0xa8, 0xa8, 0xaa, 0xb2, 0xa2,
-      0xaa, 0x8a, 0x9a, 0xaa, 0x2a, 0x6a, 0xa8, 0xa8, 0xaa, 0xa2,
-      0xa2, 0xaa, 0x8a, 0x8a, 0xaa, 0x2f, 0x6b, 0xfc)
-      .map(_.toByte)
-    assert(java.util.Arrays.equals(res, expected))
-  }
-
   test("test watchdog with scan maxHivePartitions") {
     withTable("test", "temp") {
       sql(
diff --git a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/ZorderSuite.scala b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
new file mode 100644
index 0000000..f7e7215
--- /dev/null
+++ b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Expression, ExpressionEvalHelper, Literal, NullsLast, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project, Sort}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
+import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
+import org.apache.kyuubi.sql.zorder.Zorder
+
+trait ZorderSuite extends QueryTest
+  with SQLTestUtils
+  with AdaptiveSparkPlanHelper
+  with ExpressionEvalHelper {
+
+  var _spark: SparkSession = _
+  override def spark: SparkSession = _spark
+
+  protected override def beforeAll(): Unit = {
+    _spark = SparkSession.builder()
+      .master("local[1]")
+      .config(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
+        "org.apache.kyuubi.sql.KyuubiSparkSQLExtension")
+      .config(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
+      .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")
+      .config("spark.hadoop.hive.metastore.client.capability.check", "false")
+      .config("spark.ui.enabled", "false")
+      .config(sparkConf)
+      .enableHiveSupport()
+      .getOrCreate()
+    super.beforeAll()
+  }
+
+  protected override def afterAll(): Unit = {
+    super.afterAll()
+    if (_spark != null) {
+      _spark.stop()
+    }
+    Utils.deleteRecursively(new java.io.File("spark-warehouse"))
+    Utils.deleteRecursively(new java.io.File("metastore_db"))
+  }
+
+  test("optimize unpartitioned table") {
+    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+      withTable("up") {
+        sql(s"DROP TABLE IF EXISTS up")
+
+        val target = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
+          Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
+          Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
+          Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
+        sql(s"CREATE TABLE up (c1 INT, c2 INT, c3 INT)")
+        sql(s"INSERT INTO TABLE up VALUES" +
+          "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
+          "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
+          "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
+          "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
+
+        val e = intercept[KyuubiSQLExtensionException] {
+          sql("OPTIMIZE up WHERE c1 > 1 ZORDER BY c1, c2")
+        }
+        assert(e.getMessage == "Filters are only supported for partitioned table")
+
+        sql("OPTIMIZE up ZORDER BY c1, c2")
+        val res = sql("SELECT c1, c2 FROM up").collect()
+
+        assert(res.length == 16)
+
+        for (i <- target.indices) {
+          val t = target(i)
+          val r = res(i)
+          assert(t(0) == r.getInt(0))
+          assert(t(1) == r.getInt(1))
+        }
+      }
+    }
+  }
+
+  test("optimize partitioned table") {
+    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+      withTable("p") {
+        sql("DROP TABLE IF EXISTS p")
+
+        val target = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
+          Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
+          Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
+          Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
+
+        sql(s"CREATE TABLE p (c1 INT, c2 INT, c3 INT) PARTITIONED BY (id INT)")
+        sql(s"ALTER TABLE p ADD PARTITION (id = 1)")
+        sql(s"ALTER TABLE p ADD PARTITION (id = 2)")
+        sql(s"INSERT INTO TABLE p PARTITION (id = 1) VALUES" +
+          "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
+          "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
+          "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
+          "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
+        sql(s"INSERT INTO TABLE p PARTITION (id = 2) VALUES" +
+          "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
+          "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
+          "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
+          "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
+
+        sql(s"OPTIMIZE p ZORDER BY c1, c2")
+
+        val res1 = sql(s"SELECT c1, c2 FROM p WHERE id = 1").collect()
+        val res2 = sql(s"SELECT c1, c2 FROM p WHERE id = 2").collect()
+
+        assert(res1.length == 16)
+        assert(res2.length == 16)
+
+        for (i <- target.indices) {
+          val t = target(i)
+          val r1 = res1(i)
+          assert(t(0) == r1.getInt(0))
+          assert(t(1) == r1.getInt(1))
+
+          val r2 = res2(i)
+          assert(t(0) == r2.getInt(0))
+          assert(t(1) == r2.getInt(1))
+        }
+      }
+    }
+  }
+
+  test("optimize partitioned table with filters") {
+    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+      withTable("p") {
+        sql("DROP TABLE IF EXISTS p")
+
+        val target1 = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
+          Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
+          Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
+          Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
+        val target2 = Seq(Seq(0, 0), Seq(0, 1), Seq(0, 2), Seq(0, 3),
+          Seq(1, 0), Seq(1, 1), Seq(1, 2), Seq(1, 3),
+          Seq(2, 0), Seq(2, 1), Seq(2, 2), Seq(2, 3),
+          Seq(3, 0), Seq(3, 1), Seq(3, 2), Seq(3, 3))
+        sql(s"CREATE TABLE p (c1 INT, c2 INT, c3 INT) PARTITIONED BY (id INT)")
+        sql(s"ALTER TABLE p ADD PARTITION (id = 1)")
+        sql(s"ALTER TABLE p ADD PARTITION (id = 2)")
+        sql(s"INSERT INTO TABLE p PARTITION (id = 1) VALUES" +
+          "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
+          "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
+          "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
+          "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
+        sql(s"INSERT INTO TABLE p PARTITION (id = 2) VALUES" +
+          "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
+          "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
+          "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
+          "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
+
+        val e = intercept[KyuubiSQLExtensionException](
+          sql(s"OPTIMIZE p WHERE id = 1 AND c1 > 1 ZORDER BY c1, c2")
+        )
+        assert(e.getMessage == "Only partition column filters are allowed")
+
+        sql(s"OPTIMIZE p WHERE id = 1 ZORDER BY c1, c2")
+
+        val res1 = sql(s"SELECT c1, c2 FROM p WHERE id = 1").collect()
+        val res2 = sql(s"SELECT c1, c2 FROM p WHERE id = 2").collect()
+
+        assert(res1.length == 16)
+        assert(res2.length == 16)
+
+        for (i <- target1.indices) {
+          val t1 = target1(i)
+          val r1 = res1(i)
+          assert(t1(0) == r1.getInt(0))
+          assert(t1(1) == r1.getInt(1))
+
+          val t2 = target2(i)
+          val r2 = res2(i)
+          assert(t2(0) == r2.getInt(0))
+          assert(t2(1) == r2.getInt(1))
+        }
+      }
+    }
+  }
+
+  test("optimize zorder with datasource table") {
+    // TODO remove this if we support datasource table
+    withTable("t") {
+      sql("CREATE TABLE t (c1 int, c2 int) USING PARQUET")
+      val msg = intercept[KyuubiSQLExtensionException] {
+        sql("OPTIMIZE t ZORDER BY c1, c2")
+      }.getMessage
+      assert(msg.contains("only support hive table"))
+    }
+  }
+
+  private def checkZorderTable(
+      enabled: Boolean,
+      cols: String,
+      planHasRepartition: Boolean,
+      resHasSort: Boolean): Unit = {
+    def checkSort(plan: LogicalPlan): Unit = {
+      assert(plan.isInstanceOf[Sort] === resHasSort)
+      if (plan.isInstanceOf[Sort]) {
+        val refs = plan.asInstanceOf[Sort].order.head
+          .child.asInstanceOf[Zorder].children.map(_.references.head)
+        val colArr = cols.split(",")
+        assert(refs.size === colArr.size)
+        refs.zip(colArr).foreach { case (ref, col) =>
+          assert(ref.name === col.trim)
+        }
+      }
+    }
+
+    val repartition = if (planHasRepartition) {
+      "/*+ repartition */"
+    } else {
+      ""
+    }
+    withSQLConf("spark.sql.shuffle.partitions" -> "1") {
+      // hive
+      withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
+        withTable("zorder_t1", "zorder_t2_true", "zorder_t2_false") {
+          sql(
+            s"""
+               |CREATE TABLE zorder_t1 (c1 int, c2 string, c3 long, c4 double) STORED AS PARQUET
+               |TBLPROPERTIES (
+               | 'kyuubi.zorder.enabled' = '$enabled',
+               | 'kyuubi.zorder.cols' = '$cols')
+               |""".stripMargin)
+          val df1 = sql(s"""
+                           |INSERT INTO TABLE zorder_t1
+                           |SELECT $repartition * FROM VALUES(1,'a',2,4D),(2,'b',3,6D)
+                           |""".stripMargin)
+          assert(df1.queryExecution.analyzed.isInstanceOf[InsertIntoHiveTable])
+          checkSort(df1.queryExecution.analyzed.children.head)
+
+          Seq("true", "false").foreach { optimized =>
+            withSQLConf("spark.sql.hive.convertMetastoreCtas" -> optimized,
+              "spark.sql.hive.convertMetastoreParquet" -> optimized) {
+              val df2 =
+                sql(
+                  s"""
+                     |CREATE TABLE zorder_t2_$optimized STORED AS PARQUET
+                     |TBLPROPERTIES (
+                     | 'kyuubi.zorder.enabled' = '$enabled',
+                     | 'kyuubi.zorder.cols' = '$cols')
+                     |
+                     |SELECT $repartition * FROM
+                     |VALUES(1,'a',2,4D),(2,'b',3,6D) AS t(c1 ,c2 , c3, c4)
+                     |""".stripMargin)
+              if (optimized.toBoolean) {
+                assert(df2.queryExecution.analyzed
+                  .isInstanceOf[OptimizedCreateHiveTableAsSelectCommand])
+              } else {
+                assert(df2.queryExecution.analyzed.isInstanceOf[CreateHiveTableAsSelectCommand])
+              }
+              checkSort(df2.queryExecution.analyzed.children.head)
+            }
+          }
+        }
+      }
+
+      // datasource
+      withTable("zorder_t3", "zorder_t4") {
+        sql(
+          s"""
+             |CREATE TABLE zorder_t3 (c1 int, c2 string, c3 long, c4 double) USING PARQUET
+             |TBLPROPERTIES (
+             | 'kyuubi.zorder.enabled' = '$enabled',
+             | 'kyuubi.zorder.cols' = '$cols')
+             |""".stripMargin)
+        val df1 = sql(s"""
+                         |INSERT INTO TABLE zorder_t3
+                         |SELECT $repartition * FROM VALUES(1,'a',2,4D),(2,'b',3,6D)
+                         |""".stripMargin)
+        assert(df1.queryExecution.analyzed.isInstanceOf[InsertIntoHadoopFsRelationCommand])
+        checkSort(df1.queryExecution.analyzed.children.head)
+
+        val df2 =
+          sql(
+            s"""
+               |CREATE TABLE zorder_t4 USING PARQUET
+               |TBLPROPERTIES (
+               | 'kyuubi.zorder.enabled' = '$enabled',
+               | 'kyuubi.zorder.cols' = '$cols')
+               |
+               |SELECT $repartition * FROM
+               |VALUES(1,'a',2,4D),(2,'b',3,6D) AS t(c1 ,c2 , c3, c4)
+               |""".stripMargin)
+        assert(df2.queryExecution.analyzed.isInstanceOf[CreateDataSourceTableAsSelectCommand])
+        checkSort(df2.queryExecution.analyzed.children.head)
+      }
+    }
+  }
+
+  test("Support insert zorder by table properties") {
+    withSQLConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING.key -> "false") {
+      checkZorderTable(true, "c1", false, false)
+      checkZorderTable(false, "c1", false, false)
+    }
+    withSQLConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING.key -> "true") {
+      checkZorderTable(true, "", false, false)
+      checkZorderTable(true, "c5", false, false)
+      checkZorderTable(true, "c1,c5", false, false)
+      checkZorderTable(false, "c3", false, false)
+      checkZorderTable(true, "c3", true, false)
+      checkZorderTable(true, "c3", false, true)
+      checkZorderTable(true, "c2,c4", false, true)
+      checkZorderTable(true, "c4, c2, c1, c3", false, true)
+    }
+  }
+
+  test("zorder: check unsupported data type") {
+    def checkZorderPlan(zorder: Expression): Unit = {
+      val msg = intercept[AnalysisException] {
+        val plan = Project(Seq(Alias(zorder, "c")()), OneRowRelation())
+        spark.sessionState.analyzer.checkAnalysis(plan)
+      }.getMessage
+      assert(msg.contains("Unsupported z-order type: null"))
+    }
+
+    checkZorderPlan(Zorder(Seq(Literal(null, NullType))))
+    checkZorderPlan(Zorder(Seq(Literal(1, IntegerType), Literal(null, NullType))))
+  }
+
+  test("zorder: check supported data type") {
+    val children = Seq(
+      Literal.create(false, BooleanType),
+      Literal.create(null, BooleanType),
+      Literal.create(1.toByte, ByteType),
+      Literal.create(null, ByteType),
+      Literal.create(1.toShort, ShortType),
+      Literal.create(null, ShortType),
+      Literal.create(1, IntegerType),
+      Literal.create(null, IntegerType),
+      Literal.create(1L, LongType),
+      Literal.create(null, LongType),
+      Literal.create(1f, FloatType),
+      Literal.create(null, FloatType),
+      Literal.create(1d, DoubleType),
+      Literal.create(null, DoubleType),
+      Literal.create("1", StringType),
+      Literal.create(null, StringType),
+      Literal.create(1L, TimestampType),
+      Literal.create(null, TimestampType),
+      Literal.create(1, DateType),
+      Literal.create(null, DateType),
+      Literal.create(BigDecimal(1, 1), DecimalType(1, 1)),
+      Literal.create(null, DecimalType(1, 1))
+    )
+    val zorder = Zorder(children)
+    val plan = Project(Seq(Alias(zorder, "c")()), OneRowRelation())
+    spark.sessionState.analyzer.checkAnalysis(plan)
+    assert(zorder.foldable)
+    val expected = Array(
+      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+      0x50, 0x54, 0x15, 0x05, 0x49, 0x51, 0x54, 0x55, 0x15, 0x45,
+      0x51, 0x54, 0x55, 0x15, 0x45, 0x51, 0x54, 0x55, 0x15, 0x45,
+      0x51, 0x54, 0x55, 0x15, 0x45, 0x51, 0x54, 0x55, 0x15, 0x45,
+      0x44, 0x44, 0x22, 0x22, 0x08, 0x88, 0x82, 0x22, 0x20, 0x88,
+      0x88, 0x22, 0x22, 0x08, 0x88, 0x82, 0x22, 0x20, 0xa8, 0x9a,
+      0x2a, 0x2a, 0x8a, 0x8a, 0xa2, 0xa2, 0xa8, 0xa8, 0xaa, 0x2a,
+      0x2a, 0x8a, 0x8a, 0xa2, 0xa2, 0xa8, 0xa8, 0xaa, 0xca, 0x8a,
+      0xa8, 0xa8, 0xaa, 0x8a, 0x8a, 0xa8, 0xa8, 0xaa, 0x8a, 0x8a,
+      0xa8, 0xa8, 0xaa, 0x8a, 0x8a, 0xa8, 0xa8, 0xaa, 0xb2, 0xa2,
+      0xaa, 0x8a, 0x9a, 0xaa, 0x2a, 0x6a, 0xa8, 0xa8, 0xaa, 0xa2,
+      0xa2, 0xaa, 0x8a, 0x8a, 0xaa, 0x2f, 0x6b, 0xfc)
+      .map(_.toByte)
+    checkEvaluation(zorder, expected, InternalRow.fromSeq(children))
+  }
+
+  test("sort with zorder -- int column") {
+    // TODO: add more datatype unit test
+    def checkSort(input: DataFrame, expected: Seq[Row]): Unit = {
+      withTempDir { dir =>
+        input.repartition(3).write.mode("overwrite").format("json").save(dir.getCanonicalPath)
+        val df = spark.read.format("json")
+          .load(dir.getCanonicalPath)
+          .repartition(1)
+        val exprs = Seq("c1", "c2").map(col).map(_.expr)
+        val sortOrder = SortOrder(Zorder(exprs), Ascending, NullsLast, Seq.empty)
+        val zorderSort = Sort(Seq(sortOrder), true, df.logicalPlan)
+        val result = Dataset.ofRows(spark, zorderSort)
+        checkAnswer(result, expected)
+      }
+    }
+    val session = spark
+    import session.implicits._
+    // generate 4 * 4 matrix
+    val len = 3
+    val input = spark.range(len + 1)
+      .select('id as "c1", explode(sequence(lit(0), lit(len))) as "c2")
+    val expected =
+      Row(0, 0) :: Row(1, 0) :: Row(0, 1) :: Row(1, 1) ::
+        Row(2, 0) :: Row(3, 0) :: Row(2, 1) :: Row(3, 1) ::
+        Row(0, 2) :: Row(1, 2) :: Row(0, 3) :: Row(1, 3) ::
+        Row(2, 2) :: Row(3, 2) :: Row(2, 3) :: Row(3, 3) :: Nil
+    checkSort(input, expected)
+
+    // contains null value case.
+    val nullDF = spark.range(1).select(lit(null) cast LongType).as[java.lang.Long]
+    val input2 = spark.range(len)
+      .union(nullDF)
+      .select(
+        'id as "c1",
+        explode(concat(sequence(lit(0), lit(len - 1)), array(lit(null)))) as "c2")
+    val expected2 = Row(null, null) :: Row(0, null) :: Row(1, null) :: Row(2, null) ::
+      Row(null, 0) :: Row(null, 1) :: Row(null, 2) :: Row(0, 0) ::
+      Row(1, 0) :: Row(0, 1) :: Row(1, 1) :: Row(2, 0) ::
+      Row(2, 1) :: Row(0, 2) :: Row(1, 2) :: Row(2, 2) :: Nil
+    checkSort(input2, expected2)
+  }
+
+  def sparkConf(): SparkConf
+}
+
+class ZorderWithCodegenEnabledSuite extends ZorderSuite {
+  override def sparkConf(): SparkConf = {
+    val conf = new SparkConf()
+    conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
+    conf
+  }
+}
+
+class ZorderWithCodegenDisabledSuite extends ZorderSuite {
+  override def sparkConf(): SparkConf = {
+    val conf = new SparkConf()
+    conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false")
+    conf.set(SQLConf.CODEGEN_FACTORY_MODE.key, "NO_CODEGEN")
+    conf
+  }
+}
diff --git a/pom.xml b/pom.xml
index 927e138..5c363ec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,6 +115,7 @@
         <ldapsdk.version>5.1.4</ldapsdk.version>
         <parquet.version>1.10.1</parquet.version>
         <prometheus.version>0.10.0</prometheus.version>
+        <scalacheck.version>3.2.9.0</scalacheck.version>
         <scalatest.version>3.2.9</scalatest.version>
         <scopt.version>4.0.1</scopt.version>
         <slf4j.version>1.7.30</slf4j.version>
@@ -406,6 +407,13 @@
             </dependency>
 
             <dependency>
+                <groupId>org.scalatestplus</groupId>
+                <artifactId>scalacheck-1-15_${scala.binary.version}</artifactId>
+                <version>${scalacheck.version}</version>
+                <scope>test</scope>
+            </dependency>
+
+            <dependency>
                 <groupId>org.apache.spark</groupId>
                 <artifactId>spark-sql_${scala.binary.version}</artifactId>
                 <version>${spark.version}</version>