You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/10/21 02:44:06 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1268] Simplify extension code with Spark3.2 and Spark3.1 by unify zorder code

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new f0f065b  [KYUUBI #1268] Simplify extension code with Spark3.2 and Spark3.1 by unify zorder code
f0f065b is described below

commit f0f065b983a15a577d21a3d39ae04dd7e12d058a
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Thu Oct 21 10:43:53 2021 +0800

    [KYUUBI #1268] Simplify extension code with Spark3.2 and Spark3.1 by unify zorder code
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    Reduce file and code size to help maintain the extension.
    
    - move all zorder code into common module
    - add KyuubiSparkSQLCommonExtension in common module
    
    ### _How was this patch tested?_
    Psss CI
    
    Closes #1268 from ulysses-you/simplify-extension.
    
    Closes #1268
    
    0eeb7161 [ulysses-you] add link
    c4d35c5c [ulysses-you] simplify
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: ulysses-you <ul...@apache.org>
---
 .../kyuubi/sql/KyuubiSparkSQLExtension.scala       |  16 +-
 .../sql/zorder/InsertZorderBeforeWriting.scala     |  39 --
 .../kyuubi/sql/zorder/OptimizeZorderCommand.scala  |  30 --
 .../sql/zorder/OptimizeZorderStatement.scala       |  29 --
 .../apache/kyuubi/sql/zorder/ResolveZorder.scala   |  32 --
 .../org/apache/kyuubi/sql/zorder/Zorder.scala      |  23 -
 .../zorder/ZorderSparkSqlExtensionsParser.scala    |  26 -
 .../kyuubi/sql/zorder/ZorderSqlAstBuilder.scala    |  32 --
 .../sql/InsertShuffleNodeBeforeJoinSuite.scala     |  85 ----
 dev/kyuubi-extension-spark-3-2/pom.xml             |   1 +
 .../kyuubi/sql/KyuubiSparkSQLExtension.scala       |  17 +-
 .../sql/zorder/InsertZorderBeforeWriting.scala     |  39 --
 .../kyuubi/sql/zorder/OptimizeZorderCommand.scala  |  33 --
 .../sql/zorder/OptimizeZorderStatement.scala       |  31 --
 .../apache/kyuubi/sql/zorder/ResolveZorder.scala   |  32 --
 .../org/apache/kyuubi/sql/zorder/Zorder.scala      |  25 -
 .../zorder/ZorderSparkSqlExtensionsParser.scala    |  26 -
 .../kyuubi/sql/zorder/ZorderSqlAstBuilder.scala    |  32 --
 .../scala/org/apache/spark/sql/ZorderSuite.scala   | 533 ---------------------
 .../sql/KyuubiSparkSQLCommonExtension.scala}       |  21 +-
 .../sql/zorder/InsertZorderBeforeWritingBase.scala |  19 +
 .../sql/zorder/OptimizeZorderCommandBase.scala     |  13 +
 .../sql/zorder/OptimizeZorderStatementBase.scala   |  11 +
 .../kyuubi/sql/zorder/ResolveZorderBase.scala      |  10 +
 .../org/apache/kyuubi/sql/zorder/ZorderBase.scala  |   5 +
 .../ZorderSparkSqlExtensionsParserBase.scala       |   6 +
 .../sql/zorder/ZorderSqlAstBuilderBase.scala       |  11 +
 .../sql/InsertShuffleNodeBeforeJoinSuite.scala     |   9 +-
 .../scala/org/apache/spark/sql/ZorderSuite.scala   |  12 +-
 29 files changed, 105 insertions(+), 1093 deletions(-)

diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index c6b4b59..f5a2ade 100644
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -21,7 +21,6 @@ import org.apache.spark.sql.SparkSessionExtensions
 
 import org.apache.kyuubi.sql.sqlclassification.KyuubiSqlClassification
 import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MarkAggregateOrderRule, MaxHivePartitionStrategy}
-import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder, ZorderSparkSqlExtensionsParser}
 
 // scalastyle:off line.size.limit
 /**
@@ -32,28 +31,15 @@ import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, Insert
 // scalastyle:on line.size.limit
 class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
   override def apply(extensions: SparkSessionExtensions): Unit = {
-    // inject zorder parser and related rules
-    extensions.injectParser{ case (_, parser) => new ZorderSparkSqlExtensionsParser(parser) }
-    extensions.injectResolutionRule(ResolveZorder)
-
+    KyuubiSparkSQLCommonExtension.injectCommonExtensions(extensions)
     // a help rule for ForcedMaxOutputRowsRule
     extensions.injectResolutionRule(MarkAggregateOrderRule)
 
-    // Note that:
-    // InsertZorderBeforeWritingDatasource and InsertZorderBeforeWritingHive
-    // should be applied before
-    // RepartitionBeforeWrite and RepartitionBeforeWriteHive
-    // because we can only apply one of them (i.e. Global Sort or Repartition)
-    extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
-    extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
     extensions.injectPostHocResolutionRule(KyuubiSqlClassification)
     extensions.injectPostHocResolutionRule(RepartitionBeforeWritingDatasource)
     extensions.injectPostHocResolutionRule(RepartitionBeforeWritingHive)
-    extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
     extensions.injectPostHocResolutionRule(ForcedMaxOutputRowsRule)
 
-    extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
-    extensions.injectQueryStagePrepRule(FinalStageConfigIsolation(_))
     extensions.injectPlannerStrategy(MaxHivePartitionStrategy)
   }
 }
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
deleted file mode 100644
index 7b4d215..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.Expression
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has zorder properties
- */
-case class InsertZorderBeforeWritingDatasource(session: SparkSession)
-  extends InsertZorderBeforeWritingDatasourceBase {
-  override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder properties
- */
-case class InsertZorderBeforeWritingHive(session: SparkSession)
-  extends InsertZorderBeforeWritingHiveBase {
-  override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
-}
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
deleted file mode 100644
index 401646b..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/**
- * A runnable command for zorder, we delegate to real command to execute
- */
-case class OptimizeZorderCommand(
-    catalogTable: CatalogTable,
-    query: LogicalPlan)
-  extends OptimizeZorderCommandBase {
-}
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala
deleted file mode 100644
index 55a5dcd..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/**
- * A zorder statement that contains we parsed from SQL.
- * We should convert this plan to certain command at Analyzer.
- */
-case class OptimizeZorderStatement(
-    tableIdentifier: Seq[String],
-    query: LogicalPlan) extends OptimizeZorderStatementBase {
-}
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala
deleted file mode 100644
index 21888b3..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/**
- * Resolve `OptimizeZorderStatement` to `OptimizeZorderCommand`
- */
-case class ResolveZorder(session: SparkSession) extends ResolveZorderBase {
-  override def buildOptimizeZorderCommand(
-      catalogTable: CatalogTable, query: LogicalPlan): OptimizeZorderCommandBase = {
-    OptimizeZorderCommand(catalogTable, query)
-  }
-}
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
deleted file mode 100644
index c5916f5..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.expressions.Expression
-
-case class Zorder(children: Seq[Expression]) extends ZorderBase {
-}
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParser.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParser.scala
deleted file mode 100644
index b37fb83..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParser.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-
-class ZorderSparkSqlExtensionsParser(
-    override val delegate: ParserInterface)
-  extends ZorderSparkSqlExtensionsParserBase {
-  def astBuilder: ZorderSqlAstBuilderBase = new ZorderSqlAstBuilder
-}
diff --git a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
deleted file mode 100644
index 767cbd2..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-class ZorderSqlAstBuilder extends ZorderSqlAstBuilderBase {
-  override def buildZorder(child: Seq[Expression]): ZorderBase = {
-    Zorder(child)
-  }
-
-  override def buildOptimizeZorderStatement(
-      tableIdentifier: Seq[String], query: LogicalPlan): OptimizeZorderStatementBase = {
-    OptimizeZorderStatement(tableIdentifier, query)
-  }
-}
diff --git a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
deleted file mode 100644
index 7abeee4..0000000
--- a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike}
-import org.apache.spark.sql.internal.SQLConf
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-class InsertShuffleNodeBeforeJoinSuite extends KyuubiSparkSQLExtensionTest {
-  protected override def beforeAll(): Unit = {
-    super.beforeAll()
-    setupData()
-  }
-
-  test("force shuffle before join") {
-    def checkShuffleNodeNum(sqlString: String, num: Int): Unit = {
-      var expectedResult: Seq[Row] = Seq.empty
-      withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
-        expectedResult = sql(sqlString).collect()
-      }
-      val df = sql(sqlString)
-      checkAnswer(df, expectedResult)
-      assert(
-        collect(df.queryExecution.executedPlan) {
-          case shuffle: ShuffleExchangeLike
-            if shuffle.shuffleOrigin == ENSURE_REQUIREMENTS => shuffle
-        }.size == num)
-    }
-
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      KyuubiSQLConf.FORCE_SHUFFLE_BEFORE_JOIN.key -> "true") {
-      Seq("SHUFFLE_HASH", "MERGE").foreach { joinHint =>
-        // positive case
-        checkShuffleNodeNum(
-          s"""
-             |SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from t1
-             | JOIN t2 ON t1.c1 = t2.c1
-             | JOIN t3 ON t1.c1 = t3.c1
-             | """.stripMargin, 4)
-
-        // negative case
-        checkShuffleNodeNum(
-          s"""
-             |SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from t1
-             | JOIN t2 ON t1.c1 = t2.c1
-             | JOIN t3 ON t1.c2 = t3.c2
-             | """.stripMargin, 4)
-      }
-
-      checkShuffleNodeNum(
-        """
-          |SELECT t1.c1, t2.c1, t3.c2 from t1
-          | JOIN t2 ON t1.c1 = t2.c1
-          | JOIN (
-          |  SELECT c2, count(*) FROM t1 GROUP BY c2
-          | ) t3 ON t1.c1 = t3.c2
-          | """.stripMargin, 5)
-
-      checkShuffleNodeNum(
-        """
-          |SELECT t1.c1, t2.c1, t3.c1 from t1
-          | JOIN t2 ON t1.c1 = t2.c1
-          | JOIN (
-          |  SELECT c1, count(*) FROM t1 GROUP BY c1
-          | ) t3 ON t1.c1 = t3.c1
-          | """.stripMargin, 5)
-    }
-  }
-}
diff --git a/dev/kyuubi-extension-spark-3-2/pom.xml b/dev/kyuubi-extension-spark-3-2/pom.xml
index 92d8064..5cf2f9b 100644
--- a/dev/kyuubi-extension-spark-3-2/pom.xml
+++ b/dev/kyuubi-extension-spark-3-2/pom.xml
@@ -23,6 +23,7 @@
         <groupId>org.apache.kyuubi</groupId>
         <artifactId>kyuubi-parent</artifactId>
         <version>1.4.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index f2b9ab1..2de572f 100644
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -19,8 +19,6 @@ package org.apache.kyuubi.sql
 
 import org.apache.spark.sql.SparkSessionExtensions
 
-import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder, ZorderSparkSqlExtensionsParser}
-
 // scalastyle:off line.size.limit
 /**
  * Depend on Spark SQL Extension framework, we can use this extension follow steps
@@ -30,22 +28,9 @@ import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, Insert
 // scalastyle:on line.size.limit
 class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
   override def apply(extensions: SparkSessionExtensions): Unit = {
-    // inject zorder parser and related rules
-    extensions.injectParser{ case (_, parser) => new ZorderSparkSqlExtensionsParser(parser) }
-    extensions.injectResolutionRule(ResolveZorder)
+    KyuubiSparkSQLCommonExtension.injectCommonExtensions(extensions)
 
-    // Note that:
-    // InsertZorderBeforeWritingDatasource and InsertZorderBeforeWritingHive
-    // should be applied before
-    // RebalanceBeforeWritingDatasource and RebalanceBeforeWritingHive
-    // because we can only apply one of them (i.e. Global Sort or Rebalance)
-    extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
-    extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
     extensions.injectPostHocResolutionRule(RebalanceBeforeWritingDatasource)
     extensions.injectPostHocResolutionRule(RebalanceBeforeWritingHive)
-    extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
-
-    extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
-    extensions.injectQueryStagePrepRule(FinalStageConfigIsolation(_))
   }
 }
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
deleted file mode 100644
index 7b4d215..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.Expression
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing datasource if the target table properties has zorder properties
- */
-case class InsertZorderBeforeWritingDatasource(session: SparkSession)
-  extends InsertZorderBeforeWritingDatasourceBase {
-  override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
-}
-
-/**
- * TODO: shall we forbid zorder if it's dynamic partition inserts ?
- * Insert zorder before writing hive if the target table properties has zorder properties
- */
-case class InsertZorderBeforeWritingHive(session: SparkSession)
-  extends InsertZorderBeforeWritingHiveBase {
-  override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
deleted file mode 100644
index 9f0d58c..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommand.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/**
- * A runnable command for zorder, we delegate to real command to execute
- */
-case class OptimizeZorderCommand(
-    catalogTable: CatalogTable,
-    query: LogicalPlan)
-  extends OptimizeZorderCommandBase {
-  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
-    copy(query = newChild)
-  }
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala
deleted file mode 100644
index 785b5cb..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatement.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/**
- * A zorder statement that contains we parsed from SQL.
- * We should convert this plan to certain command at Analyzer.
- */
-case class OptimizeZorderStatement(
-    tableIdentifier: Seq[String],
-    query: LogicalPlan) extends OptimizeZorderStatementBase {
-  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
-    copy(query = newChild)
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala
deleted file mode 100644
index 21888b3..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorder.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/**
- * Resolve `OptimizeZorderStatement` to `OptimizeZorderCommand`
- */
-case class ResolveZorder(session: SparkSession) extends ResolveZorderBase {
-  override def buildOptimizeZorderCommand(
-      catalogTable: CatalogTable, query: LogicalPlan): OptimizeZorderCommandBase = {
-    OptimizeZorderCommand(catalogTable, query)
-  }
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
deleted file mode 100644
index be2f1ba..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/Zorder.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.expressions.Expression
-
-case class Zorder(children: Seq[Expression]) extends ZorderBase {
-  override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
-    copy(children = newChildren)
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParser.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParser.scala
deleted file mode 100644
index b37fb83..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParser.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-
-class ZorderSparkSqlExtensionsParser(
-    override val delegate: ParserInterface)
-  extends ZorderSparkSqlExtensionsParserBase {
-  def astBuilder: ZorderSqlAstBuilderBase = new ZorderSqlAstBuilder
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala b/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
deleted file mode 100644
index 767cbd2..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilder.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.zorder
-
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-class ZorderSqlAstBuilder extends ZorderSqlAstBuilderBase {
-  override def buildZorder(child: Seq[Expression]): ZorderBase = {
-    Zorder(child)
-  }
-
-  override def buildOptimizeZorderStatement(
-      tableIdentifier: Seq[String], query: LogicalPlan): OptimizeZorderStatementBase = {
-    OptimizeZorderStatement(tableIdentifier, query)
-  }
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/ZorderSuite.scala b/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
deleted file mode 100644
index 7c03abc..0000000
--- a/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
+++ /dev/null
@@ -1,533 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, AttributeReference, Expression, ExpressionEvalHelper, Literal, NullsLast, SortOrder}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project, Sort}
-import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
-import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types._
-
-import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
-import org.apache.kyuubi.sql.zorder.{OptimizeZorderCommandBase, Zorder}
-
-trait ZorderSuite extends KyuubiSparkSQLExtensionTest with ExpressionEvalHelper {
-
-  test("optimize unpartitioned table") {
-    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
-      withTable("up") {
-        sql(s"DROP TABLE IF EXISTS up")
-
-        val target = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
-          Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
-          Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
-          Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
-        sql(s"CREATE TABLE up (c1 INT, c2 INT, c3 INT)")
-        sql(s"INSERT INTO TABLE up VALUES" +
-          "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
-          "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
-          "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
-          "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-
-        val e = intercept[KyuubiSQLExtensionException] {
-          sql("OPTIMIZE up WHERE c1 > 1 ZORDER BY c1, c2")
-        }
-        assert(e.getMessage == "Filters are only supported for partitioned table")
-
-        sql("OPTIMIZE up ZORDER BY c1, c2")
-        val res = sql("SELECT c1, c2 FROM up").collect()
-
-        assert(res.length == 16)
-
-        for (i <- target.indices) {
-          val t = target(i)
-          val r = res(i)
-          assert(t(0) == r.getInt(0))
-          assert(t(1) == r.getInt(1))
-        }
-      }
-    }
-  }
-
-  test("optimize partitioned table") {
-    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
-      withTable("p") {
-        sql("DROP TABLE IF EXISTS p")
-
-        val target = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
-          Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
-          Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
-          Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
-
-        sql(s"CREATE TABLE p (c1 INT, c2 INT, c3 INT) PARTITIONED BY (id INT)")
-        sql(s"ALTER TABLE p ADD PARTITION (id = 1)")
-        sql(s"ALTER TABLE p ADD PARTITION (id = 2)")
-        sql(s"INSERT INTO TABLE p PARTITION (id = 1) VALUES" +
-          "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
-          "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
-          "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
-          "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-        sql(s"INSERT INTO TABLE p PARTITION (id = 2) VALUES" +
-          "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
-          "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
-          "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
-          "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-
-        sql(s"OPTIMIZE p ZORDER BY c1, c2")
-
-        val res1 = sql(s"SELECT c1, c2 FROM p WHERE id = 1").collect()
-        val res2 = sql(s"SELECT c1, c2 FROM p WHERE id = 2").collect()
-
-        assert(res1.length == 16)
-        assert(res2.length == 16)
-
-        for (i <- target.indices) {
-          val t = target(i)
-          val r1 = res1(i)
-          assert(t(0) == r1.getInt(0))
-          assert(t(1) == r1.getInt(1))
-
-          val r2 = res2(i)
-          assert(t(0) == r2.getInt(0))
-          assert(t(1) == r2.getInt(1))
-        }
-      }
-    }
-  }
-
-  test("optimize partitioned table with filters") {
-    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
-      withTable("p") {
-        sql("DROP TABLE IF EXISTS p")
-
-        val target1 = Seq(Seq(0, 0), Seq(1, 0), Seq(0, 1), Seq(1, 1),
-          Seq(2, 0), Seq(3, 0), Seq(2, 1), Seq(3, 1),
-          Seq(0, 2), Seq(1, 2), Seq(0, 3), Seq(1, 3),
-          Seq(2, 2), Seq(3, 2), Seq(2, 3), Seq(3, 3))
-        val target2 = Seq(Seq(0, 0), Seq(0, 1), Seq(0, 2), Seq(0, 3),
-          Seq(1, 0), Seq(1, 1), Seq(1, 2), Seq(1, 3),
-          Seq(2, 0), Seq(2, 1), Seq(2, 2), Seq(2, 3),
-          Seq(3, 0), Seq(3, 1), Seq(3, 2), Seq(3, 3))
-        sql(s"CREATE TABLE p (c1 INT, c2 INT, c3 INT) PARTITIONED BY (id INT)")
-        sql(s"ALTER TABLE p ADD PARTITION (id = 1)")
-        sql(s"ALTER TABLE p ADD PARTITION (id = 2)")
-        sql(s"INSERT INTO TABLE p PARTITION (id = 1) VALUES" +
-          "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
-          "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
-          "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
-          "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-        sql(s"INSERT INTO TABLE p PARTITION (id = 2) VALUES" +
-          "(0,0,2),(0,1,2),(0,2,1),(0,3,3)," +
-          "(1,0,4),(1,1,2),(1,2,1),(1,3,3)," +
-          "(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
-          "(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
-
-        val e = intercept[KyuubiSQLExtensionException](
-          sql(s"OPTIMIZE p WHERE id = 1 AND c1 > 1 ZORDER BY c1, c2")
-        )
-        assert(e.getMessage == "Only partition column filters are allowed")
-
-        sql(s"OPTIMIZE p WHERE id = 1 ZORDER BY c1, c2")
-
-        val res1 = sql(s"SELECT c1, c2 FROM p WHERE id = 1").collect()
-        val res2 = sql(s"SELECT c1, c2 FROM p WHERE id = 2").collect()
-
-        assert(res1.length == 16)
-        assert(res2.length == 16)
-
-        for (i <- target1.indices) {
-          val t1 = target1(i)
-          val r1 = res1(i)
-          assert(t1(0) == r1.getInt(0))
-          assert(t1(1) == r1.getInt(1))
-
-          val t2 = target2(i)
-          val r2 = res2(i)
-          assert(t2(0) == r2.getInt(0))
-          assert(t2(1) == r2.getInt(1))
-        }
-      }
-    }
-  }
-
-  test("optimize zorder with datasource table") {
-    // TODO remove this if we support datasource table
-    withTable("t") {
-      sql("CREATE TABLE t (c1 int, c2 int) USING PARQUET")
-      val msg = intercept[KyuubiSQLExtensionException] {
-        sql("OPTIMIZE t ZORDER BY c1, c2")
-      }.getMessage
-      assert(msg.contains("only support hive table"))
-    }
-  }
-
-  private def checkZorderTable(
-      enabled: Boolean,
-      cols: String,
-      planHasRepartition: Boolean,
-      resHasSort: Boolean): Unit = {
-    def checkSort(plan: LogicalPlan): Unit = {
-      assert(plan.isInstanceOf[Sort] === resHasSort)
-      if (plan.isInstanceOf[Sort]) {
-        val colArr = cols.split(",")
-        val refs = if (colArr.length == 1) {
-          plan.asInstanceOf[Sort].order.head
-            .child.asInstanceOf[AttributeReference] :: Nil
-        } else {
-          plan.asInstanceOf[Sort].order.head
-            .child.asInstanceOf[Zorder].children.map(_.references.head)
-        }
-        assert(refs.size === colArr.size)
-        refs.zip(colArr).foreach { case (ref, col) =>
-          assert(ref.name === col.trim)
-        }
-      }
-    }
-
-    val repartition = if (planHasRepartition) {
-      "/*+ repartition */"
-    } else {
-      ""
-    }
-    withSQLConf("spark.sql.shuffle.partitions" -> "1") {
-      // hive
-      withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
-        withTable("zorder_t1", "zorder_t2_true", "zorder_t2_false") {
-          sql(
-            s"""
-               |CREATE TABLE zorder_t1 (c1 int, c2 string, c3 long, c4 double) STORED AS PARQUET
-               |TBLPROPERTIES (
-               | 'kyuubi.zorder.enabled' = '$enabled',
-               | 'kyuubi.zorder.cols' = '$cols')
-               |""".stripMargin)
-          val df1 = sql(s"""
-                           |INSERT INTO TABLE zorder_t1
-                           |SELECT $repartition * FROM VALUES(1,'a',2,4D),(2,'b',3,6D)
-                           |""".stripMargin)
-          assert(df1.queryExecution.analyzed.isInstanceOf[InsertIntoHiveTable])
-          checkSort(df1.queryExecution.analyzed.children.head)
-
-          Seq("true", "false").foreach { optimized =>
-            withSQLConf("spark.sql.hive.convertMetastoreCtas" -> optimized,
-              "spark.sql.hive.convertMetastoreParquet" -> optimized) {
-              val df2 =
-                sql(
-                  s"""
-                     |CREATE TABLE zorder_t2_$optimized STORED AS PARQUET
-                     |TBLPROPERTIES (
-                     | 'kyuubi.zorder.enabled' = '$enabled',
-                     | 'kyuubi.zorder.cols' = '$cols')
-                     |
-                     |SELECT $repartition * FROM
-                     |VALUES(1,'a',2,4D),(2,'b',3,6D) AS t(c1 ,c2 , c3, c4)
-                     |""".stripMargin)
-              if (optimized.toBoolean) {
-                assert(df2.queryExecution.analyzed
-                  .isInstanceOf[OptimizedCreateHiveTableAsSelectCommand])
-              } else {
-                assert(df2.queryExecution.analyzed.isInstanceOf[CreateHiveTableAsSelectCommand])
-              }
-              checkSort(df2.queryExecution.analyzed.children.head)
-            }
-          }
-        }
-      }
-
-      // datasource
-      withTable("zorder_t3", "zorder_t4") {
-        sql(
-          s"""
-             |CREATE TABLE zorder_t3 (c1 int, c2 string, c3 long, c4 double) USING PARQUET
-             |TBLPROPERTIES (
-             | 'kyuubi.zorder.enabled' = '$enabled',
-             | 'kyuubi.zorder.cols' = '$cols')
-             |""".stripMargin)
-        val df1 = sql(s"""
-                         |INSERT INTO TABLE zorder_t3
-                         |SELECT $repartition * FROM VALUES(1,'a',2,4D),(2,'b',3,6D)
-                         |""".stripMargin)
-        assert(df1.queryExecution.analyzed.isInstanceOf[InsertIntoHadoopFsRelationCommand])
-        checkSort(df1.queryExecution.analyzed.children.head)
-
-        val df2 =
-          sql(
-            s"""
-               |CREATE TABLE zorder_t4 USING PARQUET
-               |TBLPROPERTIES (
-               | 'kyuubi.zorder.enabled' = '$enabled',
-               | 'kyuubi.zorder.cols' = '$cols')
-               |
-               |SELECT $repartition * FROM
-               |VALUES(1,'a',2,4D),(2,'b',3,6D) AS t(c1 ,c2 , c3, c4)
-               |""".stripMargin)
-        assert(df2.queryExecution.analyzed.isInstanceOf[CreateDataSourceTableAsSelectCommand])
-        checkSort(df2.queryExecution.analyzed.children.head)
-      }
-    }
-  }
-
-  test("Support insert zorder by table properties") {
-    withSQLConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING.key -> "false") {
-      checkZorderTable(true, "c1", false, false)
-      checkZorderTable(false, "c1", false, false)
-    }
-    withSQLConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING.key -> "true") {
-      checkZorderTable(true, "", false, false)
-      checkZorderTable(true, "c5", false, false)
-      checkZorderTable(true, "c1,c5", false, false)
-      checkZorderTable(false, "c3", false, false)
-      checkZorderTable(true, "c3", true, false)
-      checkZorderTable(true, "c3", false, true)
-      checkZorderTable(true, "c2,c4", false, true)
-      checkZorderTable(true, "c4, c2, c1, c3", false, true)
-    }
-  }
-
-  test("zorder: check unsupported data type") {
-    def checkZorderPlan(zorder: Expression): Unit = {
-      val msg = intercept[AnalysisException] {
-        val plan = Project(Seq(Alias(zorder, "c")()), OneRowRelation())
-        spark.sessionState.analyzer.checkAnalysis(plan)
-      }.getMessage
-      assert(msg.contains("Unsupported z-order type: void"))
-    }
-
-    checkZorderPlan(Zorder(Seq(Literal(null, NullType))))
-    checkZorderPlan(Zorder(Seq(Literal(1, IntegerType), Literal(null, NullType))))
-  }
-
-  test("zorder: check supported data type") {
-    val children = Seq(
-      Literal.create(false, BooleanType),
-      Literal.create(null, BooleanType),
-      Literal.create(1.toByte, ByteType),
-      Literal.create(null, ByteType),
-      Literal.create(1.toShort, ShortType),
-      Literal.create(null, ShortType),
-      Literal.create(1, IntegerType),
-      Literal.create(null, IntegerType),
-      Literal.create(1L, LongType),
-      Literal.create(null, LongType),
-      Literal.create(1f, FloatType),
-      Literal.create(null, FloatType),
-      Literal.create(1d, DoubleType),
-      Literal.create(null, DoubleType),
-      Literal.create("1", StringType),
-      Literal.create(null, StringType),
-      Literal.create(1L, TimestampType),
-      Literal.create(null, TimestampType),
-      Literal.create(1, DateType),
-      Literal.create(null, DateType),
-      Literal.create(BigDecimal(1, 1), DecimalType(1, 1)),
-      Literal.create(null, DecimalType(1, 1))
-    )
-    val zorder = Zorder(children)
-    val plan = Project(Seq(Alias(zorder, "c")()), OneRowRelation())
-    spark.sessionState.analyzer.checkAnalysis(plan)
-    assert(zorder.foldable)
-
-//    // scalastyle:off
-//    val resultGen = org.apache.commons.codec.binary.Hex.encodeHex(
-//      zorder.eval(InternalRow.fromSeq(children)).asInstanceOf[Array[Byte]], false)
-//    resultGen.grouped(2).zipWithIndex.foreach { case (char, i) =>
-//      print("0x" + char(0) + char(1) + ", ")
-//      if ((i + 1) % 10 == 0) {
-//        println()
-//      }
-//    }
-//    // scalastyle:on
-
-    val expected = Array(
-      0xFB, 0xEA, 0xAA, 0xBA, 0xAE, 0xAB, 0xAA, 0xEA, 0xBA, 0xAE,
-      0xAB, 0xAA, 0xEA, 0xBA, 0xA6, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA,
-      0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA,
-      0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA,
-      0xBA, 0xBB, 0xAA, 0xAA, 0xAA, 0xBA, 0xAA, 0xBA, 0xAA, 0xBA,
-      0xAA, 0xBA, 0xAA, 0xBA, 0xAA, 0xBA, 0xAA, 0x9A, 0xAA, 0xAA,
-      0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA,
-      0xAA, 0xAA, 0xAA, 0xAA, 0xEA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA,
-      0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA,
-      0xAA, 0xAA, 0xBE, 0xAA, 0xAA, 0x8A, 0xBA, 0xAA, 0x2A, 0xEA,
-      0xA8, 0xAA, 0xAA, 0xA2, 0xAA, 0xAA, 0x8A, 0xAA, 0xAA, 0x2F,
-      0xEB, 0xFC)
-      .map(_.toByte)
-    checkEvaluation(zorder, expected, InternalRow.fromSeq(children))
-  }
-
-  private def checkSort(input: DataFrame, expected: Seq[Row], dataType: Array[DataType]): Unit = {
-    withTempDir { dir =>
-      input.repartition(3).write.mode("overwrite").format("parquet").save(dir.getCanonicalPath)
-      val df = spark.read.format("parquet")
-        .load(dir.getCanonicalPath)
-        .repartition(1)
-      assert(df.schema.fields.map(_.dataType).sameElements(dataType))
-      val exprs = Seq("c1", "c2").map(col).map(_.expr)
-      val sortOrder = SortOrder(Zorder(exprs), Ascending, NullsLast, Seq.empty)
-      val zorderSort = Sort(Seq(sortOrder), true, df.logicalPlan)
-      val result = Dataset.ofRows(spark, zorderSort)
-      checkAnswer(result, expected)
-    }
-  }
-
-  test("sort with zorder -- int column") {
-    // TODO: add more datatype unit test
-    val session = spark
-    import session.implicits._
-    // generate 4 * 4 matrix
-    val len = 3
-    val input = spark.range(len + 1).selectExpr("cast(id as int) as c1")
-      .select($"c1", explode(sequence(lit(0), lit(len))) as "c2")
-    val expected =
-      Row(0, 0) :: Row(1, 0) :: Row(0, 1) :: Row(1, 1) ::
-        Row(2, 0) :: Row(3, 0) :: Row(2, 1) :: Row(3, 1) ::
-        Row(0, 2) :: Row(1, 2) :: Row(0, 3) :: Row(1, 3) ::
-        Row(2, 2) :: Row(3, 2) :: Row(2, 3) :: Row(3, 3) :: Nil
-    checkSort(input, expected, Array(IntegerType, IntegerType))
-
-    // contains null value case.
-    val nullDF = spark.range(1).selectExpr("cast(null as int) as c1")
-    val input2 = spark.range(len).selectExpr("cast(id as int) as c1")
-      .union(nullDF)
-      .select(
-        $"c1",
-        explode(concat(sequence(lit(0), lit(len - 1)), array(lit(null)))) as "c2")
-    val expected2 = Row(0, 0) :: Row(1, 0) :: Row(0, 1) :: Row(1, 1) ::
-      Row(2, 0) :: Row(2, 1) :: Row(0, 2) :: Row(1, 2) ::
-      Row(2, 2) :: Row(null, 0) :: Row(null, 1) :: Row(null, 2) ::
-      Row(0, null) :: Row(1, null) :: Row(2, null) :: Row(null, null) :: Nil
-    checkSort(input2, expected2, Array(IntegerType, IntegerType))
-  }
-
-  test("sort with zorder -- string column") {
-    val schema = StructType(StructField("c1", StringType) :: StructField("c2", StringType) :: Nil)
-    val rdd = spark.sparkContext.parallelize(Seq(
-      Row("a", "a"), Row("a", "b"), Row("a", "c"), Row("a", "d"),
-      Row("b", "a"), Row("b", "b"), Row("b", "c"), Row("b", "d"),
-      Row("c", "a"), Row("c", "b"), Row("c", "c"), Row("c", "d"),
-      Row("d", "a"), Row("d", "b"), Row("d", "c"), Row("d", "d")))
-    val input = spark.createDataFrame(rdd, schema)
-    val expected = Row("a", "a") :: Row("b", "a") :: Row("c", "a") :: Row("a", "b") ::
-        Row("a", "c") :: Row("b", "b") :: Row("c", "b") :: Row("b", "c") ::
-        Row("c", "c") :: Row("d", "a") :: Row("d", "b") :: Row("d", "c") ::
-        Row("a", "d") :: Row("b", "d") :: Row("c", "d") :: Row("d", "d") :: Nil
-    checkSort(input, expected, Array(StringType, StringType))
-
-    val rdd2 = spark.sparkContext.parallelize(Seq(
-      Row(null, "a"), Row("a", "b"), Row("a", "c"), Row("a", null),
-      Row("b", "a"), Row(null, "b"), Row("b", null), Row("b", "d"),
-      Row("c", "a"), Row("c", null), Row(null, "c"), Row("c", "d"),
-      Row("d", null), Row("d", "b"), Row("d", "c"), Row(null, "d"), Row(null, null)))
-    val input2 = spark.createDataFrame(rdd2, schema)
-    val expected2 = Row("b", "a") :: Row("c", "a") :: Row("a", "b") :: Row("a", "c") ::
-      Row("d", "b") :: Row("d", "c") :: Row("b", "d") :: Row("c", "d") ::
-      Row(null, "a") :: Row(null, "b") :: Row(null, "c") :: Row(null, "d") ::
-      Row("a", null) :: Row("b", null) :: Row("c", null) :: Row("d", null) ::
-      Row(null, null) :: Nil
-    checkSort(input2, expected2, Array(StringType, StringType))
-  }
-
-  test("test special value of short int long type") {
-    val df1 = spark.createDataFrame(Seq(
-      (-1, -1L),
-      (Int.MinValue, Int.MinValue.toLong),
-      (1, 1L),
-      (Int.MaxValue - 1, Int.MaxValue.toLong),
-      (Int.MaxValue - 1, Int.MaxValue.toLong - 1),
-      (Int.MaxValue, Int.MaxValue.toLong + 1),
-      (Int.MaxValue, Int.MaxValue.toLong))).toDF("c1", "c2")
-    val expected1 =
-      Row(Int.MinValue, Int.MinValue.toLong) ::
-        Row(-1, -1L) ::
-        Row(1, 1L) ::
-        Row(Int.MaxValue - 1, Int.MaxValue.toLong - 1) ::
-        Row(Int.MaxValue - 1, Int.MaxValue.toLong) ::
-        Row(Int.MaxValue, Int.MaxValue.toLong) ::
-        Row(Int.MaxValue, Int.MaxValue.toLong + 1) :: Nil
-    checkSort(df1, expected1, Array(IntegerType, LongType))
-
-    val df2 = spark.createDataFrame(Seq(
-      (-1, -1.toShort),
-      (Short.MinValue.toInt, Short.MinValue),
-      (1, 1.toShort),
-      (Short.MaxValue.toInt, (Short.MaxValue - 1).toShort),
-      (Short.MaxValue.toInt + 1, (Short.MaxValue - 1).toShort),
-      (Short.MaxValue.toInt, Short.MaxValue),
-      (Short.MaxValue.toInt + 1, Short.MaxValue))).toDF("c1", "c2")
-    val expected2 =
-      Row(Short.MinValue.toInt, Short.MinValue) ::
-        Row(-1, -1.toShort) ::
-        Row(1, 1.toShort) ::
-        Row(Short.MaxValue.toInt, Short.MaxValue - 1) ::
-        Row(Short.MaxValue.toInt, Short.MaxValue) ::
-        Row(Short.MaxValue.toInt + 1, Short.MaxValue - 1) ::
-        Row(Short.MaxValue.toInt + 1, Short.MaxValue) :: Nil
-    checkSort(df2, expected2, Array(IntegerType, ShortType))
-
-    val df3 = spark.createDataFrame(Seq(
-      (-1L, -1.toShort),
-      (Short.MinValue.toLong, Short.MinValue),
-      (1L, 1.toShort),
-      (Short.MaxValue.toLong, (Short.MaxValue - 1).toShort),
-      (Short.MaxValue.toLong + 1, (Short.MaxValue - 1).toShort),
-      (Short.MaxValue.toLong, Short.MaxValue),
-      (Short.MaxValue.toLong + 1, Short.MaxValue))).toDF("c1", "c2")
-    val expected3 =
-      Row(Short.MinValue.toLong, Short.MinValue) ::
-        Row(-1L, -1.toShort) ::
-        Row(1L, 1.toShort) ::
-        Row(Short.MaxValue.toLong, Short.MaxValue - 1) ::
-        Row(Short.MaxValue.toLong, Short.MaxValue) ::
-        Row(Short.MaxValue.toLong + 1, Short.MaxValue - 1) ::
-        Row(Short.MaxValue.toLong + 1, Short.MaxValue) :: Nil
-    checkSort(df3, expected3, Array(LongType, ShortType))
-  }
-
-  test("skip zorder if only requires one column") {
-    withTable("t") {
-      withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
-        sql("CREATE TABLE t (c1 int, c2 string) stored as parquet")
-        val order1 = sql("OPTIMIZE t ZORDER BY c1").queryExecution.analyzed
-          .asInstanceOf[OptimizeZorderCommandBase].query.asInstanceOf[Sort].order.head.child
-        assert(!order1.isInstanceOf[Zorder])
-        assert(order1.isInstanceOf[AttributeReference])
-      }
-    }
-  }
-}
-
-class ZorderWithCodegenEnabledSuite extends ZorderSuite {
-  override def sparkConf(): SparkConf = {
-    val conf = super.sparkConf
-    conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
-    conf
-  }
-}
-
-class ZorderWithCodegenDisabledSuite extends ZorderSuite {
-  override def sparkConf(): SparkConf = {
-    val conf = super.sparkConf
-    conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false")
-    conf.set(SQLConf.CODEGEN_FACTORY_MODE.key, "NO_CODEGEN")
-    conf
-  }
-}
diff --git a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
similarity index 75%
copy from dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
copy to dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
index f2b9ab1..a6b4f86 100644
--- a/dev/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
@@ -21,15 +21,14 @@ import org.apache.spark.sql.SparkSessionExtensions
 
 import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder, ZorderSparkSqlExtensionsParser}
 
-// scalastyle:off line.size.limit
-/**
- * Depend on Spark SQL Extension framework, we can use this extension follow steps
- *   1. move this jar into $SPARK_HOME/jars
- *   2. add config into `spark-defaults.conf`: `spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension`
- */
-// scalastyle:on line.size.limit
-class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
+class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
   override def apply(extensions: SparkSessionExtensions): Unit = {
+    KyuubiSparkSQLCommonExtension.injectCommonExtensions(extensions)
+  }
+}
+
+object KyuubiSparkSQLCommonExtension {
+  def injectCommonExtensions(extensions: SparkSessionExtensions): Unit = {
     // inject zorder parser and related rules
     extensions.injectParser{ case (_, parser) => new ZorderSparkSqlExtensionsParser(parser) }
     extensions.injectResolutionRule(ResolveZorder)
@@ -37,12 +36,10 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
     // Note that:
     // InsertZorderBeforeWritingDatasource and InsertZorderBeforeWritingHive
     // should be applied before
-    // RebalanceBeforeWritingDatasource and RebalanceBeforeWritingHive
-    // because we can only apply one of them (i.e. Global Sort or Rebalance)
+    // RepartitionBeforeWriting and RebalanceBeforeWriting
+    // because we can only apply one of them (i.e. Global Sort or Repartition/Rebalance)
     extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
     extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
-    extensions.injectPostHocResolutionRule(RebalanceBeforeWritingDatasource)
-    extensions.injectPostHocResolutionRule(RebalanceBeforeWritingHive)
     extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
 
     extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
index 974e04b..e76e81f 100644
--- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.sql.zorder
 
 import java.util.Locale
 
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, NullsLast, SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Repartition, RepartitionByExpression, Sort}
@@ -161,3 +162,21 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
     }
   }
 }
+
+/**
+ * TODO: shall we forbid zorder if it's dynamic partition inserts ?
+ * Insert zorder before writing datasource if the target table properties has zorder properties
+ */
+case class InsertZorderBeforeWritingDatasource(session: SparkSession)
+  extends InsertZorderBeforeWritingDatasourceBase {
+  override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
+}
+
+/**
+ * TODO: shall we forbid zorder if it's dynamic partition inserts ?
+ * Insert zorder before writing hive if the target table properties has zorder properties
+ */
+case class InsertZorderBeforeWritingHive(session: SparkSession)
+  extends InsertZorderBeforeWritingHiveBase {
+  override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
+}
diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommandBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommandBase.scala
index 262964a..c4f52fb 100644
--- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommandBase.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderCommandBase.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.sql.zorder
 
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.DataWritingCommand
 import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
@@ -64,3 +65,15 @@ abstract class OptimizeZorderCommandBase extends DataWritingCommand {
     Seq.empty
   }
 }
+
+/**
+ * A runnable command for zorder, we delegate to real command to execute
+ */
+case class OptimizeZorderCommand(
+    catalogTable: CatalogTable,
+    query: LogicalPlan)
+  extends OptimizeZorderCommandBase {
+  protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
+    copy(query = newChild)
+  }
+}
diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatementBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatementBase.scala
index f9201ab..a9bb5a5 100644
--- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatementBase.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/OptimizeZorderStatementBase.scala
@@ -30,3 +30,14 @@ abstract class OptimizeZorderStatementBase extends UnaryNode {
   override def child: LogicalPlan = query
   override def output: Seq[Attribute] = child.output
 }
+
+/**
+ * A zorder statement that contains we parsed from SQL.
+ * We should convert this plan to certain command at Analyzer.
+ */
+case class OptimizeZorderStatement(
+    tableIdentifier: Seq[String],
+    query: LogicalPlan) extends OptimizeZorderStatementBase {
+  protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+    copy(query = newChild)
+}
diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorderBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorderBase.scala
index 1fee20c..d839c02 100644
--- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorderBase.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ResolveZorderBase.scala
@@ -65,3 +65,13 @@ abstract class ResolveZorderBase extends Rule[LogicalPlan] {
     case _ => plan
   }
 }
+
+/**
+ * Resolve `OptimizeZorderStatement` to `OptimizeZorderCommand`
+ */
+case class ResolveZorder(session: SparkSession) extends ResolveZorderBase {
+  override def buildOptimizeZorderCommand(
+      catalogTable: CatalogTable, query: LogicalPlan): OptimizeZorderCommandBase = {
+    OptimizeZorderCommand(catalogTable, query)
+  }
+}
diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBase.scala
index 6069e0b..29e7d5b 100644
--- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBase.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderBase.scala
@@ -87,3 +87,8 @@ abstract class ZorderBase extends Expression {
       isNull = FalseLiteral)
   }
 }
+
+case class Zorder(children: Seq[Expression]) extends ZorderBase {
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
+    copy(children = newChildren)
+}
diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParserBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParserBase.scala
index a235f04..de6457e 100644
--- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParserBase.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSparkSqlExtensionsParserBase.scala
@@ -104,6 +104,12 @@ abstract class ZorderSparkSqlExtensionsParserBase extends ParserInterface {
   }
 }
 
+class ZorderSparkSqlExtensionsParser(
+    override val delegate: ParserInterface)
+  extends ZorderSparkSqlExtensionsParserBase {
+  def astBuilder: ZorderSqlAstBuilderBase = new ZorderSqlAstBuilder
+}
+
 /* Copied from Apache Spark's to avoid dependency on Spark Internals */
 class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
   override def consume(): Unit = wrapped.consume
diff --git a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilderBase.scala b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilderBase.scala
index 2467430..33ed18a 100644
--- a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilderBase.scala
+++ b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/zorder/ZorderSqlAstBuilderBase.scala
@@ -415,3 +415,14 @@ abstract class ZorderSqlAstBuilderBase extends ZorderSqlExtensionsBaseVisitor[An
     }
   }
 }
+
+class ZorderSqlAstBuilder extends ZorderSqlAstBuilderBase {
+  override def buildZorder(child: Seq[Expression]): ZorderBase = {
+    Zorder(child)
+  }
+
+  override def buildOptimizeZorderStatement(
+      tableIdentifier: Seq[String], query: LogicalPlan): OptimizeZorderStatementBase = {
+    OptimizeZorderStatement(tableIdentifier, query)
+  }
+}
diff --git a/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala b/dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
similarity index 91%
rename from dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
rename to dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
index 7abeee4..8ffa4bd 100644
--- a/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
+++ b/dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.SparkConf
 import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 
 import org.apache.kyuubi.sql.KyuubiSQLConf
 
@@ -28,6 +29,12 @@ class InsertShuffleNodeBeforeJoinSuite extends KyuubiSparkSQLExtensionTest {
     setupData()
   }
 
+  override def sparkConf(): SparkConf = {
+    super.sparkConf()
+      .set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
+        "org.apache.kyuubi.sql.KyuubiSparkSQLCommonExtension")
+  }
+
   test("force shuffle before join") {
     def checkShuffleNodeNum(sqlString: String, num: Int): Unit = {
       var expectedResult: Seq[Row] = Seq.empty
diff --git a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/ZorderSuite.scala b/dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
similarity index 97%
rename from dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
rename to dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
index 9d8a842..490e252 100644
--- a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
+++ b/dev/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/ZorderSuite.scala
@@ -25,13 +25,18 @@ import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectComma
 import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.types._
 
 import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
 import org.apache.kyuubi.sql.zorder.{OptimizeZorderCommandBase, Zorder}
 
 trait ZorderSuite extends KyuubiSparkSQLExtensionTest with ExpressionEvalHelper {
+  override def sparkConf(): SparkConf = {
+    super.sparkConf()
+      .set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
+        "org.apache.kyuubi.sql.KyuubiSparkSQLCommonExtension")
+  }
 
   test("optimize unpartitioned table") {
     withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
@@ -309,7 +314,10 @@ trait ZorderSuite extends KyuubiSparkSQLExtensionTest with ExpressionEvalHelper
         val plan = Project(Seq(Alias(zorder, "c")()), OneRowRelation())
         spark.sessionState.analyzer.checkAnalysis(plan)
       }.getMessage
-      assert(msg.contains("Unsupported z-order type: null"))
+      // before Spark 3.2.0 the null type catalog string is null, after Spark 3.2.0 it's void
+      // see https://github.com/apache/spark/pull/33437
+      assert(msg.contains("Unsupported z-order type:") &&
+        (msg.contains("null") || msg.contains("void")))
     }
 
     checkZorderPlan(Zorder(Seq(Literal(null, NullType))))