You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/11 02:33:29 UTC

[hudi] 02/20: [HUDI-5278] Support more conf to cluster procedure (#7304)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ebf99a7e84f2c937a968a5a51f48945919c07015
Author: KnightChess <98...@qq.com>
AuthorDate: Wed Nov 30 09:02:07 2022 +0800

    [HUDI-5278] Support more conf to cluster procedure (#7304)
---
 .../apache/hudi/config/HoodieClusteringConfig.java |  52 ++++
 .../hudi/config/metrics/HoodieMetricsConfig.java   |   2 +-
 .../procedures/RunClusteringProcedure.scala        | 143 ++++++++---
 .../hudi/procedure/TestClusteringProcedure.scala   | 269 ++++++++++++++++++++-
 4 files changed, 432 insertions(+), 34 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 1180845a6ed..8db88178f8d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -680,5 +680,57 @@ public class HoodieClusteringConfig extends HoodieConfig {
 
       return enumValue;
     }
+
+    public String getValue() {
+      return value;
+    }
+  }
+
+  public enum ClusteringOperator {
+
+    /**
+     * only schedule the clustering plan
+     */
+    SCHEDULE("schedule"),
+
+    /**
+     * only execute then pending clustering plans
+     */
+    EXECUTE("execute"),
+
+    /**
+     * schedule cluster first, and execute all pending clustering plans
+     */
+    SCHEDULE_AND_EXECUTE("scheduleandexecute");
+
+    private static final Map<String, ClusteringOperator> VALUE_TO_ENUM_MAP =
+            TypeUtils.getValueToEnumMap(ClusteringOperator.class, e -> e.value);
+
+    private final String value;
+
+    ClusteringOperator(String value) {
+      this.value = value;
+    }
+
+    @Nonnull
+    public static ClusteringOperator fromValue(String value) {
+      ClusteringOperator enumValue = VALUE_TO_ENUM_MAP.get(value);
+      if (enumValue == null) {
+        throw new HoodieException(String.format("Invalid value (%s)", value));
+      }
+      return enumValue;
+    }
+
+    public boolean isSchedule() {
+      return this != ClusteringOperator.EXECUTE;
+    }
+
+    public boolean isExecute() {
+      return this != ClusteringOperator.SCHEDULE;
+    }
+
+    public String getValue() {
+      return value;
+    }
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
index 787819be120..050b0519ee6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
@@ -47,7 +47,7 @@ public class HoodieMetricsConfig extends HoodieConfig {
 
   public static final ConfigProperty<Boolean> TURN_METRICS_ON = ConfigProperty
       .key(METRIC_PREFIX + ".on")
-      .defaultValue(true)
+      .defaultValue(false)
       .sinceVersion("0.5.0")
       .withDocumentation("Turn on/off metrics reporting. off by default.");
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index 18ea636c057..d34c0b0d7b7 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -18,11 +18,13 @@
 package org.apache.spark.sql.hudi.command.procedures
 
 import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
-import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption}
+import org.apache.hudi.common.util.{ClusteringUtils, StringUtils, Option => HOption}
 import org.apache.hudi.config.HoodieClusteringConfig
+import org.apache.hudi.config.HoodieClusteringConfig.{ClusteringOperator, LayoutOptimizationStrategy}
 import org.apache.hudi.exception.HoodieClusteringException
 import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieFileIndex}
 import org.apache.spark.internal.Logging
@@ -32,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.PredicateHelper
 import org.apache.spark.sql.execution.datasources.FileStatusCache
 import org.apache.spark.sql.types._
 
+import java.util.Locale
 import java.util.function.Supplier
 import scala.collection.JavaConverters._
 
@@ -49,7 +52,12 @@ class RunClusteringProcedure extends BaseProcedure
     ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
     ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None),
     ProcedureParameter.optional(3, "order", DataTypes.StringType, None),
-    ProcedureParameter.optional(4, "show_involved_partition", DataTypes.BooleanType, false)
+    ProcedureParameter.optional(4, "show_involved_partition", DataTypes.BooleanType, false),
+    ProcedureParameter.optional(5, "op", DataTypes.StringType, None),
+    ProcedureParameter.optional(6, "order_strategy", DataTypes.StringType, None),
+    // params => key=value, key2=value2
+    ProcedureParameter.optional(7, "options", DataTypes.StringType, None),
+    ProcedureParameter.optional(8, "instants", DataTypes.StringType, None)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -71,6 +79,10 @@ class RunClusteringProcedure extends BaseProcedure
     val predicate = getArgValueOrDefault(args, PARAMETERS(2))
     val orderColumns = getArgValueOrDefault(args, PARAMETERS(3))
     val showInvolvedPartitions = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean]
+    val op = getArgValueOrDefault(args, PARAMETERS(5))
+    val orderStrategy = getArgValueOrDefault(args, PARAMETERS(6))
+    val options = getArgValueOrDefault(args, PARAMETERS(7))
+    val instantsStr = getArgValueOrDefault(args, PARAMETERS(8))
 
     val basePath: String = getBasePath(tableName, tablePath)
     val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
@@ -99,41 +111,98 @@ class RunClusteringProcedure extends BaseProcedure
         logInfo("No order columns")
     }
 
+    orderStrategy match {
+      case Some(o) =>
+        val strategy = LayoutOptimizationStrategy.fromValue(o.asInstanceOf[String])
+        conf = conf ++ Map(
+          HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key() -> strategy.getValue
+        )
+      case _ =>
+        logInfo("No order strategy")
+    }
+
+    options match {
+      case Some(p) =>
+        val paramPairs = StringUtils.split(p.asInstanceOf[String], ",").asScala
+        paramPairs.foreach{ pair =>
+          val values = StringUtils.split(pair, "=")
+          conf = conf ++ Map(values.get(0) -> values.get(1))
+        }
+      case _ =>
+        logInfo("No options")
+    }
+
     // Get all pending clustering instants
     var pendingClustering = ClusteringUtils.getAllPendingClusteringPlans(metaClient)
       .iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f)
-    logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}")
 
-    val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf)
-    val instantTime = HoodieActiveTimeline.createNewInstantTime
-    if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
-      pendingClustering ++= Seq(instantTime)
+    var operator: ClusteringOperator = ClusteringOperator.SCHEDULE_AND_EXECUTE
+    pendingClustering = instantsStr match {
+      case Some(inst) =>
+        op match {
+          case Some(o) =>
+            if (!ClusteringOperator.EXECUTE.name().equalsIgnoreCase(o.asInstanceOf[String])) {
+              throw new HoodieClusteringException("specific instants only can be used in 'execute' op or not specific op")
+            }
+          case _ =>
+            logInfo("No op and set it to EXECUTE with instants specified.")
+        }
+        operator = ClusteringOperator.EXECUTE
+        checkAndFilterPendingInstants(pendingClustering, inst.asInstanceOf[String])
+      case _ =>
+        logInfo("No specific instants")
+        op match {
+          case Some(o) =>
+            operator = ClusteringOperator.fromValue(o.asInstanceOf[String].toLowerCase(Locale.ROOT))
+          case _ =>
+            logInfo("No op, use default scheduleAndExecute")
+        }
+        pendingClustering
     }
-    logInfo(s"Clustering instants to run: ${pendingClustering.mkString(",")}.")
-
-    val startTs = System.currentTimeMillis()
-    pendingClustering.foreach(client.cluster(_, true))
-    logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," +
-      s" time cost: ${System.currentTimeMillis() - startTs}ms.")
-
-    val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
-      .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp))
-      .toSeq
-      .sortBy(f => f.getTimestamp)
-      .reverse
-
-    val clusteringPlans = clusteringInstants.map(instant =>
-      ClusteringUtils.getClusteringPlan(metaClient, instant)
-    )
-
-    if (showInvolvedPartitions) {
-      clusteringPlans.map { p =>
-        Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
-          p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala))
+
+    logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}")
+
+    var client: SparkRDDWriteClient[_] = null
+    try {
+      client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf)
+      if (operator.isSchedule) {
+        val instantTime = HoodieActiveTimeline.createNewInstantTime
+        if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
+          pendingClustering ++= Seq(instantTime)
+        }
+      }
+      logInfo(s"Clustering instants to run: ${pendingClustering.mkString(",")}.")
+
+      if (operator.isExecute) {
+        val startTs = System.currentTimeMillis()
+        pendingClustering.foreach(client.cluster(_, true))
+        logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," +
+          s" time cost: ${System.currentTimeMillis() - startTs}ms.")
+      }
+
+      val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
+        .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp))
+        .toSeq
+        .sortBy(f => f.getTimestamp)
+        .reverse
+
+      val clusteringPlans = clusteringInstants.map(instant =>
+        ClusteringUtils.getClusteringPlan(metaClient, instant)
+      )
+
+      if (showInvolvedPartitions) {
+        clusteringPlans.map { p =>
+          Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
+            p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala))
+        }
+      } else {
+        clusteringPlans.map { p =>
+          Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), p.get().getLeft.getState.name(), "*")
+        }
       }
-    } else {
-      clusteringPlans.map { p =>
-        Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), p.get().getLeft.getState.name(), "*")
+    } finally {
+      if (client != null) {
+        client.close()
       }
     }
   }
@@ -174,6 +243,16 @@ class RunClusteringProcedure extends BaseProcedure
     })
   }
 
+  private def checkAndFilterPendingInstants(pendingInstants: Seq[String], instantStr: String): Seq[String] = {
+    val instants = StringUtils.split(instantStr, ",").asScala
+    val pendingSet = pendingInstants.toSet
+    val noneInstants = instants.filter(ins => !pendingSet.contains(ins))
+    if (noneInstants.nonEmpty) {
+      throw new HoodieClusteringException(s"specific ${noneInstants.mkString(",")} instants is not exist")
+    }
+    instants.sortBy(f => f)
+  }
+
 }
 
 object RunClusteringProcedure {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 456f9c5066a..e488811c0db 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -19,11 +19,21 @@
 
 package org.apache.spark.sql.hudi.procedure
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.DataSourceWriteOptions.{OPERATION, RECORDKEY_FIELD}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.util.{Option => HOption}
-import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers}
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.{DataSourceReadOptions, HoodieCLIUtils, HoodieDataSourceHelpers, HoodieFileIndex}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
+import org.apache.spark.sql.types.{DataTypes, Metadata, StringType, StructField, StructType}
+import org.apache.spark.sql.{Dataset, Row}
 
+import java.util
 import scala.collection.JavaConverters.asScalaIteratorConverter
 
 class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
@@ -385,4 +395,261 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
       }
     }
   }
+
+  test("Test Call run_clustering Procedure with specific instants") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  c1 int,
+           |  c2 string,
+           |  c3 double
+           |) using hudi
+           | options (
+           |  primaryKey = 'c1',
+           |  type = 'cow',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.index.column.stats.enable = 'true',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.datasource.write.operation = 'insert'
+           | )
+           | location '$basePath'
+     """.stripMargin)
+
+      writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> "false"))
+      spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')")
+
+      writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> "false"))
+      spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')")
+
+      val conf = new Configuration
+      val metaClient = HoodieTableMetaClient.builder.setConf(conf).setBasePath(basePath).build
+      val instants = metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.iterator().asScala.map(_.getTimestamp).toSeq
+      assert(2 == instants.size)
+
+      checkExceptionContain(
+        s"call run_clustering(table => '$tableName', instants => '000000, ${instants.head}')"
+      )("specific 000000 instants is not exist")
+      metaClient.reloadActiveTimeline()
+      assert(0 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(2 == metaClient.getActiveTimeline.filterPendingReplaceTimeline.getInstants.count())
+
+      writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> "false"))
+      // specific instants will not schedule new cluster plan
+      spark.sql(s"call run_clustering(table => '$tableName', instants => '${instants.mkString(",")}')")
+      metaClient.reloadActiveTimeline()
+      assert(2 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline.getInstants.count())
+
+      // test with operator schedule
+      checkExceptionContain(
+      s"call run_clustering(table => '$tableName', instants => '000000', op => 'schedule')"
+      )("specific instants only can be used in 'execute' op or not specific op")
+
+      // test with operator scheduleAndExecute
+      checkExceptionContain(
+        s"call run_clustering(table => '$tableName', instants => '000000', op => 'scheduleAndExecute')"
+      )("specific instants only can be used in 'execute' op or not specific op")
+
+      // test with operator execute
+      spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')")
+      metaClient.reloadActiveTimeline()
+      val instants2 = metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.iterator().asScala.map(_.getTimestamp).toSeq
+      spark.sql(s"call run_clustering(table => '$tableName', instants => '${instants2.mkString(",")}', op => 'execute')")
+      metaClient.reloadActiveTimeline()
+      assert(3 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline.getInstants.count())
+    }
+  }
+
+  test("Test Call run_clustering Procedure op") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  c1 int,
+           |  c2 string,
+           |  c3 double
+           |) using hudi
+           | options (
+           |  primaryKey = 'c1',
+           |  type = 'cow',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.index.column.stats.enable = 'true',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.datasource.write.operation = 'insert'
+           | )
+           | location '$basePath'
+     """.stripMargin)
+
+      writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate"-> "false"))
+      val conf = new Configuration
+      val metaClient = HoodieTableMetaClient.builder.setConf(conf).setBasePath(basePath).build
+      assert(0 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(metaClient.getActiveTimeline.filterPendingReplaceTimeline().empty())
+
+      spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')")
+      metaClient.reloadActiveTimeline()
+      assert(0 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(1 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.count())
+
+      spark.sql(s"call run_clustering(table => '$tableName', op => 'execute')")
+      metaClient.reloadActiveTimeline()
+      assert(1 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.count())
+
+      spark.sql(s"call run_clustering(table => '$tableName')")
+      metaClient.reloadActiveTimeline()
+      assert(2 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.count())
+
+      spark.sql(s"call run_clustering(table => '$tableName')")
+      metaClient.reloadActiveTimeline()
+      assert(3 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.count())
+
+      checkExceptionContain(s"call run_clustering(table => '$tableName', op => 'null')")("Invalid value")
+    }
+  }
+
+  test("Test Call run_clustering Procedure Order Strategy") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+      val metadataOpts = Map(
+        HoodieMetadataConfig.ENABLE.key -> "true",
+        HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" ,
+        DataSourceReadOptions.ENABLE_DATA_SKIPPING.key() -> "true"
+      )
+
+      val queryOpts = metadataOpts ++ Map(
+        "path" -> basePath,
+        DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL
+      )
+
+      val dataFilterC2 = EqualTo(AttributeReference("c2", StringType, nullable = false)(), Literal("foo23"))
+      val dataFilterC3 = EqualTo(AttributeReference("c3", StringType, nullable = false)(), Literal("bar23"))
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  c1 int,
+           |  c2 string,
+           |  c3 double
+           |) using hudi
+           | options (
+           |  primaryKey = 'c1',
+           |  type = 'cow',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.index.column.stats.enable = 'true',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.datasource.write.operation = 'insert'
+           | )
+           | location '$basePath'
+     """.stripMargin)
+
+      val fileNum = 20
+      val numRecords = 400000
+
+      // insert records
+      writeRecords(fileNum, numRecords, 0, basePath,  metadataOpts ++ Map("hoodie.avro.schema.validate"-> "false"))
+      val conf = new Configuration
+      val metaClient = HoodieTableMetaClient.builder.setConf(conf).setBasePath(basePath).build
+      val avgSize = avgRecord(metaClient.getActiveTimeline)
+      val avgCount = Math.ceil(1.0 * numRecords / fileNum).toLong
+
+      spark.sql(
+        s"""call run_clustering(table => '$tableName', order => 'c2,c3', order_strategy => 'linear', options => "
+           | hoodie.copyonwrite.record.size.estimate=$avgSize,
+           | hoodie.parquet.max.file.size=${avgSize * avgCount},
+           | hoodie.parquet.small.file.limit=0,
+           | hoodie.clustering.plan.strategy.target.file.max.bytes=${avgSize * avgCount},
+           | hoodie.metadata.enable=true,
+           | hoodie.metadata.index.column.stats.enable=true
+           |")""".stripMargin)
+
+      metaClient.reloadActiveTimeline()
+      val fileIndex1 = HoodieFileIndex(spark, metaClient, None, queryOpts)
+      val orderAllFiles = fileIndex1.allFiles.size
+      val c2OrderFilterCount = fileIndex1.listFiles(Seq(), Seq(dataFilterC2)).head.files.size
+      val c3OrderFilterCount = fileIndex1.listFiles(Seq(), Seq(dataFilterC3)).head.files.size
+
+      spark.sql(
+        s"""call run_clustering(table => '$tableName', order => 'c2,c3', order_strategy => 'z-order', options => "
+           | hoodie.copyonwrite.record.size.estimate=$avgSize,
+           | hoodie.parquet.max.file.size=${avgSize * avgCount},
+           | hoodie.parquet.small.file.limit=0,
+           | hoodie.clustering.plan.strategy.target.file.max.bytes=${avgSize * avgCount},
+           | hoodie.metadata.enable=true,
+           | hoodie.metadata.index.column.stats.enable=true
+           |")""".stripMargin)
+
+      metaClient.reloadActiveTimeline()
+      val fileIndex2 = HoodieFileIndex(spark, metaClient, None, queryOpts)
+      val ZOrderAllFiles = fileIndex2.allFiles.size
+      val c2ZOrderFilterCount = fileIndex2.listFiles(Seq(), Seq(dataFilterC2)).head.files.size
+      val c3ZOrderFilterCount = fileIndex2.listFiles(Seq(), Seq(dataFilterC3)).head.files.size
+
+      assert((1.0 * c2OrderFilterCount / orderAllFiles) < (1.0 * c2ZOrderFilterCount / ZOrderAllFiles))
+      assert((1.0 * c3OrderFilterCount / orderAllFiles) > (1.0 * c3ZOrderFilterCount / ZOrderAllFiles))
+    }
+  }
+
+  def avgRecord(commitTimeline: HoodieTimeline): Long = {
+    var totalByteSize = 0L
+    var totalRecordsCount = 0L
+    commitTimeline.getReverseOrderedInstants.toArray.foreach(instant => {
+      val commitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(instant.asInstanceOf[HoodieInstant]).get, classOf[HoodieCommitMetadata])
+      totalByteSize = totalByteSize + commitMetadata.fetchTotalBytesWritten()
+      totalRecordsCount = totalRecordsCount + commitMetadata.fetchTotalRecordsWritten()
+    })
+
+    Math.ceil((1.0 * totalByteSize) / totalRecordsCount).toLong
+  }
+
+  def writeRecords(files: Int, numRecords: Int, partitions: Int, location: String, options: Map[String, String]): Unit = {
+    val records = new util.ArrayList[Row](numRecords)
+    val rowDimension = Math.ceil(Math.sqrt(numRecords)).toInt
+
+    val data = Stream.range(0, rowDimension, 1)
+      .flatMap(x => Stream.range(0, rowDimension, 1).map(y => Pair.of(x, y)))
+
+    if (partitions > 0) {
+      data.foreach { i =>
+        records.add(Row(i.getLeft % partitions, "foo" + i.getLeft, "bar" + i.getRight))
+      }
+    } else {
+      data.foreach { i =>
+        records.add(Row(i.getLeft, "foo" + i.getLeft, "bar" + i.getRight))
+      }
+    }
+
+    val struct = StructType(Array[StructField](
+      StructField("c1", DataTypes.IntegerType, nullable = true, Metadata.empty),
+      StructField("c2", DataTypes.StringType, nullable = true, Metadata.empty),
+      StructField("c3", DataTypes.StringType, nullable = true, Metadata.empty)
+    ))
+
+    // files can not effect for hudi
+    val df = spark.createDataFrame(records, struct).repartition(files)
+    writeDF(df, location, options)
+  }
+
+  def writeDF(df: Dataset[Row], location: String, options: Map[String, String]): Unit = {
+    df.select("c1", "c2", "c3")
+      .sortWithinPartitions("c1", "c2")
+      .write
+      .format("hudi")
+      .option(OPERATION.key(), WriteOperationType.INSERT.value())
+      .option(RECORDKEY_FIELD.key(), "c1")
+      .options(options)
+      .mode("append").save(location)
+  }
 }