You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/11 02:33:29 UTC
[hudi] 02/20: [HUDI-5278] Support more conf to cluster procedure (#7304)
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ebf99a7e84f2c937a968a5a51f48945919c07015
Author: KnightChess <98...@qq.com>
AuthorDate: Wed Nov 30 09:02:07 2022 +0800
[HUDI-5278] Support more conf to cluster procedure (#7304)
---
.../apache/hudi/config/HoodieClusteringConfig.java | 52 ++++
.../hudi/config/metrics/HoodieMetricsConfig.java | 2 +-
.../procedures/RunClusteringProcedure.scala | 143 ++++++++---
.../hudi/procedure/TestClusteringProcedure.scala | 269 ++++++++++++++++++++-
4 files changed, 432 insertions(+), 34 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 1180845a6ed..8db88178f8d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -680,5 +680,57 @@ public class HoodieClusteringConfig extends HoodieConfig {
return enumValue;
}
+
+ public String getValue() {
+ return value;
+ }
+ }
+
+ public enum ClusteringOperator {
+
+ /**
+ * only schedule the clustering plan
+ */
+ SCHEDULE("schedule"),
+
+ /**
+ * only execute then pending clustering plans
+ */
+ EXECUTE("execute"),
+
+ /**
+ * schedule cluster first, and execute all pending clustering plans
+ */
+ SCHEDULE_AND_EXECUTE("scheduleandexecute");
+
+ private static final Map<String, ClusteringOperator> VALUE_TO_ENUM_MAP =
+ TypeUtils.getValueToEnumMap(ClusteringOperator.class, e -> e.value);
+
+ private final String value;
+
+ ClusteringOperator(String value) {
+ this.value = value;
+ }
+
+ @Nonnull
+ public static ClusteringOperator fromValue(String value) {
+ ClusteringOperator enumValue = VALUE_TO_ENUM_MAP.get(value);
+ if (enumValue == null) {
+ throw new HoodieException(String.format("Invalid value (%s)", value));
+ }
+ return enumValue;
+ }
+
+ public boolean isSchedule() {
+ return this != ClusteringOperator.EXECUTE;
+ }
+
+ public boolean isExecute() {
+ return this != ClusteringOperator.SCHEDULE;
+ }
+
+ public String getValue() {
+ return value;
+ }
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
index 787819be120..050b0519ee6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
@@ -47,7 +47,7 @@ public class HoodieMetricsConfig extends HoodieConfig {
public static final ConfigProperty<Boolean> TURN_METRICS_ON = ConfigProperty
.key(METRIC_PREFIX + ".on")
- .defaultValue(true)
+ .defaultValue(false)
.sinceVersion("0.5.0")
.withDocumentation("Turn on/off metrics reporting. off by default.");
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index 18ea636c057..d34c0b0d7b7 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -18,11 +18,13 @@
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkArgument
-import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption}
+import org.apache.hudi.common.util.{ClusteringUtils, StringUtils, Option => HOption}
import org.apache.hudi.config.HoodieClusteringConfig
+import org.apache.hudi.config.HoodieClusteringConfig.{ClusteringOperator, LayoutOptimizationStrategy}
import org.apache.hudi.exception.HoodieClusteringException
import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieFileIndex}
import org.apache.spark.internal.Logging
@@ -32,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.execution.datasources.FileStatusCache
import org.apache.spark.sql.types._
+import java.util.Locale
import java.util.function.Supplier
import scala.collection.JavaConverters._
@@ -49,7 +52,12 @@ class RunClusteringProcedure extends BaseProcedure
ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None),
ProcedureParameter.optional(3, "order", DataTypes.StringType, None),
- ProcedureParameter.optional(4, "show_involved_partition", DataTypes.BooleanType, false)
+ ProcedureParameter.optional(4, "show_involved_partition", DataTypes.BooleanType, false),
+ ProcedureParameter.optional(5, "op", DataTypes.StringType, None),
+ ProcedureParameter.optional(6, "order_strategy", DataTypes.StringType, None),
+ // params => key=value, key2=value2
+ ProcedureParameter.optional(7, "options", DataTypes.StringType, None),
+ ProcedureParameter.optional(8, "instants", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -71,6 +79,10 @@ class RunClusteringProcedure extends BaseProcedure
val predicate = getArgValueOrDefault(args, PARAMETERS(2))
val orderColumns = getArgValueOrDefault(args, PARAMETERS(3))
val showInvolvedPartitions = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean]
+ val op = getArgValueOrDefault(args, PARAMETERS(5))
+ val orderStrategy = getArgValueOrDefault(args, PARAMETERS(6))
+ val options = getArgValueOrDefault(args, PARAMETERS(7))
+ val instantsStr = getArgValueOrDefault(args, PARAMETERS(8))
val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
@@ -99,41 +111,98 @@ class RunClusteringProcedure extends BaseProcedure
logInfo("No order columns")
}
+ orderStrategy match {
+ case Some(o) =>
+ val strategy = LayoutOptimizationStrategy.fromValue(o.asInstanceOf[String])
+ conf = conf ++ Map(
+ HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key() -> strategy.getValue
+ )
+ case _ =>
+ logInfo("No order strategy")
+ }
+
+ options match {
+ case Some(p) =>
+ val paramPairs = StringUtils.split(p.asInstanceOf[String], ",").asScala
+ paramPairs.foreach{ pair =>
+ val values = StringUtils.split(pair, "=")
+ conf = conf ++ Map(values.get(0) -> values.get(1))
+ }
+ case _ =>
+ logInfo("No options")
+ }
+
// Get all pending clustering instants
var pendingClustering = ClusteringUtils.getAllPendingClusteringPlans(metaClient)
.iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f)
- logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}")
- val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf)
- val instantTime = HoodieActiveTimeline.createNewInstantTime
- if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
- pendingClustering ++= Seq(instantTime)
+ var operator: ClusteringOperator = ClusteringOperator.SCHEDULE_AND_EXECUTE
+ pendingClustering = instantsStr match {
+ case Some(inst) =>
+ op match {
+ case Some(o) =>
+ if (!ClusteringOperator.EXECUTE.name().equalsIgnoreCase(o.asInstanceOf[String])) {
+ throw new HoodieClusteringException("specific instants only can be used in 'execute' op or not specific op")
+ }
+ case _ =>
+ logInfo("No op and set it to EXECUTE with instants specified.")
+ }
+ operator = ClusteringOperator.EXECUTE
+ checkAndFilterPendingInstants(pendingClustering, inst.asInstanceOf[String])
+ case _ =>
+ logInfo("No specific instants")
+ op match {
+ case Some(o) =>
+ operator = ClusteringOperator.fromValue(o.asInstanceOf[String].toLowerCase(Locale.ROOT))
+ case _ =>
+ logInfo("No op, use default scheduleAndExecute")
+ }
+ pendingClustering
}
- logInfo(s"Clustering instants to run: ${pendingClustering.mkString(",")}.")
-
- val startTs = System.currentTimeMillis()
- pendingClustering.foreach(client.cluster(_, true))
- logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," +
- s" time cost: ${System.currentTimeMillis() - startTs}ms.")
-
- val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
- .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp))
- .toSeq
- .sortBy(f => f.getTimestamp)
- .reverse
-
- val clusteringPlans = clusteringInstants.map(instant =>
- ClusteringUtils.getClusteringPlan(metaClient, instant)
- )
-
- if (showInvolvedPartitions) {
- clusteringPlans.map { p =>
- Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
- p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala))
+
+ logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}")
+
+ var client: SparkRDDWriteClient[_] = null
+ try {
+ client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf)
+ if (operator.isSchedule) {
+ val instantTime = HoodieActiveTimeline.createNewInstantTime
+ if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
+ pendingClustering ++= Seq(instantTime)
+ }
+ }
+ logInfo(s"Clustering instants to run: ${pendingClustering.mkString(",")}.")
+
+ if (operator.isExecute) {
+ val startTs = System.currentTimeMillis()
+ pendingClustering.foreach(client.cluster(_, true))
+ logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," +
+ s" time cost: ${System.currentTimeMillis() - startTs}ms.")
+ }
+
+ val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
+ .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp))
+ .toSeq
+ .sortBy(f => f.getTimestamp)
+ .reverse
+
+ val clusteringPlans = clusteringInstants.map(instant =>
+ ClusteringUtils.getClusteringPlan(metaClient, instant)
+ )
+
+ if (showInvolvedPartitions) {
+ clusteringPlans.map { p =>
+ Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
+ p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala))
+ }
+ } else {
+ clusteringPlans.map { p =>
+ Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), p.get().getLeft.getState.name(), "*")
+ }
}
- } else {
- clusteringPlans.map { p =>
- Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), p.get().getLeft.getState.name(), "*")
+ } finally {
+ if (client != null) {
+ client.close()
}
}
}
@@ -174,6 +243,16 @@ class RunClusteringProcedure extends BaseProcedure
})
}
+ private def checkAndFilterPendingInstants(pendingInstants: Seq[String], instantStr: String): Seq[String] = {
+ val instants = StringUtils.split(instantStr, ",").asScala
+ val pendingSet = pendingInstants.toSet
+ val noneInstants = instants.filter(ins => !pendingSet.contains(ins))
+ if (noneInstants.nonEmpty) {
+ throw new HoodieClusteringException(s"specific ${noneInstants.mkString(",")} instants is not exist")
+ }
+ instants.sortBy(f => f)
+ }
+
}
object RunClusteringProcedure {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 456f9c5066a..e488811c0db 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -19,11 +19,21 @@
package org.apache.spark.sql.hudi.procedure
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.hudi.DataSourceWriteOptions.{OPERATION, RECORDKEY_FIELD}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
+import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.util.{Option => HOption}
-import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers}
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.{DataSourceReadOptions, HoodieCLIUtils, HoodieDataSourceHelpers, HoodieFileIndex}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
+import org.apache.spark.sql.types.{DataTypes, Metadata, StringType, StructField, StructType}
+import org.apache.spark.sql.{Dataset, Row}
+import java.util
import scala.collection.JavaConverters.asScalaIteratorConverter
class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
@@ -385,4 +395,261 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
}
}
}
+
+ test("Test Call run_clustering Procedure with specific instants") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | c1 int,
+ | c2 string,
+ | c3 double
+ |) using hudi
+ | options (
+ | primaryKey = 'c1',
+ | type = 'cow',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.metadata.index.column.stats.enable = 'true',
+ | hoodie.enable.data.skipping = 'true',
+ | hoodie.datasource.write.operation = 'insert'
+ | )
+ | location '$basePath'
+ """.stripMargin)
+
+ writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> "false"))
+ spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')")
+
+ writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> "false"))
+ spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')")
+
+ val conf = new Configuration
+ val metaClient = HoodieTableMetaClient.builder.setConf(conf).setBasePath(basePath).build
+ val instants = metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.iterator().asScala.map(_.getTimestamp).toSeq
+ assert(2 == instants.size)
+
+ checkExceptionContain(
+ s"call run_clustering(table => '$tableName', instants => '000000, ${instants.head}')"
+ )("specific 000000 instants is not exist")
+ metaClient.reloadActiveTimeline()
+ assert(0 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+ assert(2 == metaClient.getActiveTimeline.filterPendingReplaceTimeline.getInstants.count())
+
+ writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> "false"))
+ // specific instants will not schedule new cluster plan
+ spark.sql(s"call run_clustering(table => '$tableName', instants => '${instants.mkString(",")}')")
+ metaClient.reloadActiveTimeline()
+ assert(2 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+ assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline.getInstants.count())
+
+ // test with operator schedule
+ checkExceptionContain(
+ s"call run_clustering(table => '$tableName', instants => '000000', op => 'schedule')"
+ )("specific instants only can be used in 'execute' op or not specific op")
+
+ // test with operator scheduleAndExecute
+ checkExceptionContain(
+ s"call run_clustering(table => '$tableName', instants => '000000', op => 'scheduleAndExecute')"
+ )("specific instants only can be used in 'execute' op or not specific op")
+
+ // test with operator execute
+ spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')")
+ metaClient.reloadActiveTimeline()
+ val instants2 = metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.iterator().asScala.map(_.getTimestamp).toSeq
+ spark.sql(s"call run_clustering(table => '$tableName', instants => '${instants2.mkString(",")}', op => 'execute')")
+ metaClient.reloadActiveTimeline()
+ assert(3 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+ assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline.getInstants.count())
+ }
+ }
+
+ test("Test Call run_clustering Procedure op") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | c1 int,
+ | c2 string,
+ | c3 double
+ |) using hudi
+ | options (
+ | primaryKey = 'c1',
+ | type = 'cow',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.metadata.index.column.stats.enable = 'true',
+ | hoodie.enable.data.skipping = 'true',
+ | hoodie.datasource.write.operation = 'insert'
+ | )
+ | location '$basePath'
+ """.stripMargin)
+
+ writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate"-> "false"))
+ val conf = new Configuration
+ val metaClient = HoodieTableMetaClient.builder.setConf(conf).setBasePath(basePath).build
+ assert(0 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+ assert(metaClient.getActiveTimeline.filterPendingReplaceTimeline().empty())
+
+ spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')")
+ metaClient.reloadActiveTimeline()
+ assert(0 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+ assert(1 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.count())
+
+ spark.sql(s"call run_clustering(table => '$tableName', op => 'execute')")
+ metaClient.reloadActiveTimeline()
+ assert(1 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+ assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.count())
+
+ spark.sql(s"call run_clustering(table => '$tableName')")
+ metaClient.reloadActiveTimeline()
+ assert(2 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+ assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.count())
+
+ spark.sql(s"call run_clustering(table => '$tableName')")
+ metaClient.reloadActiveTimeline()
+ assert(3 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+ assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.count())
+
+ checkExceptionContain(s"call run_clustering(table => '$tableName', op => 'null')")("Invalid value")
+ }
+ }
+
+ test("Test Call run_clustering Procedure Order Strategy") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" ,
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key() -> "true"
+ )
+
+ val queryOpts = metadataOpts ++ Map(
+ "path" -> basePath,
+ DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL
+ )
+
+ val dataFilterC2 = EqualTo(AttributeReference("c2", StringType, nullable = false)(), Literal("foo23"))
+ val dataFilterC3 = EqualTo(AttributeReference("c3", StringType, nullable = false)(), Literal("bar23"))
+
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | c1 int,
+ | c2 string,
+ | c3 double
+ |) using hudi
+ | options (
+ | primaryKey = 'c1',
+ | type = 'cow',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.metadata.index.column.stats.enable = 'true',
+ | hoodie.enable.data.skipping = 'true',
+ | hoodie.datasource.write.operation = 'insert'
+ | )
+ | location '$basePath'
+ """.stripMargin)
+
+ val fileNum = 20
+ val numRecords = 400000
+
+ // insert records
+ writeRecords(fileNum, numRecords, 0, basePath, metadataOpts ++ Map("hoodie.avro.schema.validate"-> "false"))
+ val conf = new Configuration
+ val metaClient = HoodieTableMetaClient.builder.setConf(conf).setBasePath(basePath).build
+ val avgSize = avgRecord(metaClient.getActiveTimeline)
+ val avgCount = Math.ceil(1.0 * numRecords / fileNum).toLong
+
+ spark.sql(
+ s"""call run_clustering(table => '$tableName', order => 'c2,c3', order_strategy => 'linear', options => "
+ | hoodie.copyonwrite.record.size.estimate=$avgSize,
+ | hoodie.parquet.max.file.size=${avgSize * avgCount},
+ | hoodie.parquet.small.file.limit=0,
+ | hoodie.clustering.plan.strategy.target.file.max.bytes=${avgSize * avgCount},
+ | hoodie.metadata.enable=true,
+ | hoodie.metadata.index.column.stats.enable=true
+ |")""".stripMargin)
+
+ metaClient.reloadActiveTimeline()
+ val fileIndex1 = HoodieFileIndex(spark, metaClient, None, queryOpts)
+ val orderAllFiles = fileIndex1.allFiles.size
+ val c2OrderFilterCount = fileIndex1.listFiles(Seq(), Seq(dataFilterC2)).head.files.size
+ val c3OrderFilterCount = fileIndex1.listFiles(Seq(), Seq(dataFilterC3)).head.files.size
+
+ spark.sql(
+ s"""call run_clustering(table => '$tableName', order => 'c2,c3', order_strategy => 'z-order', options => "
+ | hoodie.copyonwrite.record.size.estimate=$avgSize,
+ | hoodie.parquet.max.file.size=${avgSize * avgCount},
+ | hoodie.parquet.small.file.limit=0,
+ | hoodie.clustering.plan.strategy.target.file.max.bytes=${avgSize * avgCount},
+ | hoodie.metadata.enable=true,
+ | hoodie.metadata.index.column.stats.enable=true
+ |")""".stripMargin)
+
+ metaClient.reloadActiveTimeline()
+ val fileIndex2 = HoodieFileIndex(spark, metaClient, None, queryOpts)
+ val ZOrderAllFiles = fileIndex2.allFiles.size
+ val c2ZOrderFilterCount = fileIndex2.listFiles(Seq(), Seq(dataFilterC2)).head.files.size
+ val c3ZOrderFilterCount = fileIndex2.listFiles(Seq(), Seq(dataFilterC3)).head.files.size
+
+ assert((1.0 * c2OrderFilterCount / orderAllFiles) < (1.0 * c2ZOrderFilterCount / ZOrderAllFiles))
+ assert((1.0 * c3OrderFilterCount / orderAllFiles) > (1.0 * c3ZOrderFilterCount / ZOrderAllFiles))
+ }
+ }
+
+ def avgRecord(commitTimeline: HoodieTimeline): Long = {
+ var totalByteSize = 0L
+ var totalRecordsCount = 0L
+ commitTimeline.getReverseOrderedInstants.toArray.foreach(instant => {
+ val commitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(instant.asInstanceOf[HoodieInstant]).get, classOf[HoodieCommitMetadata])
+ totalByteSize = totalByteSize + commitMetadata.fetchTotalBytesWritten()
+ totalRecordsCount = totalRecordsCount + commitMetadata.fetchTotalRecordsWritten()
+ })
+
+ Math.ceil((1.0 * totalByteSize) / totalRecordsCount).toLong
+ }
+
+ def writeRecords(files: Int, numRecords: Int, partitions: Int, location: String, options: Map[String, String]): Unit = {
+ val records = new util.ArrayList[Row](numRecords)
+ val rowDimension = Math.ceil(Math.sqrt(numRecords)).toInt
+
+ val data = Stream.range(0, rowDimension, 1)
+ .flatMap(x => Stream.range(0, rowDimension, 1).map(y => Pair.of(x, y)))
+
+ if (partitions > 0) {
+ data.foreach { i =>
+ records.add(Row(i.getLeft % partitions, "foo" + i.getLeft, "bar" + i.getRight))
+ }
+ } else {
+ data.foreach { i =>
+ records.add(Row(i.getLeft, "foo" + i.getLeft, "bar" + i.getRight))
+ }
+ }
+
+ val struct = StructType(Array[StructField](
+ StructField("c1", DataTypes.IntegerType, nullable = true, Metadata.empty),
+ StructField("c2", DataTypes.StringType, nullable = true, Metadata.empty),
+ StructField("c3", DataTypes.StringType, nullable = true, Metadata.empty)
+ ))
+
+ // files can not effect for hudi
+ val df = spark.createDataFrame(records, struct).repartition(files)
+ writeDF(df, location, options)
+ }
+
+ def writeDF(df: Dataset[Row], location: String, options: Map[String, String]): Unit = {
+ df.select("c1", "c2", "c3")
+ .sortWithinPartitions("c1", "c2")
+ .write
+ .format("hudi")
+ .option(OPERATION.key(), WriteOperationType.INSERT.value())
+ .option(RECORDKEY_FIELD.key(), "c1")
+ .options(options)
+ .mode("append").save(location)
+ }
}