You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/11 02:33:36 UTC
[hudi] 09/20: [MINOR] improve RunClusteringProcedure with partition selected
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit cca17a599d6c571297c91c5a8d9a9c790d0ce258
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Tue Feb 7 14:01:21 2023 +0800
[MINOR] improve RunClusteringProcedure with partition selected
---
.../PartitionAwareClusteringPlanStrategy.java | 2 ++
.../procedures/RunClusteringProcedure.scala | 31 +++++++++++++---------
.../hudi/procedure/TestClusteringProcedure.scala | 12 ++++-----
3 files changed, 27 insertions(+), 18 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 8aafa6d28c4..03d83dd12ea 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -73,6 +73,7 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
HoodieWriteConfig config = getWriteConfig();
String partitionSelected = config.getClusteringPartitionSelected();
+ LOG.info("Scheduling clustering partitionSelected: " + partitionSelected);
List<String> partitionPaths;
if (StringUtils.isNullOrEmpty(partitionSelected)) {
@@ -84,6 +85,7 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
}
partitionPaths = filterPartitionPaths(partitionPaths);
+ LOG.info("Scheduling clustering partitionPaths: " + partitionPaths);
if (partitionPaths.isEmpty()) {
// In case no partitions could be picked, return no clustering plan
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index d34c0b0d7b7..449e03c2e48 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.common.util.{ClusteringUtils, StringUtils, Option => HOption}
@@ -57,7 +57,8 @@ class RunClusteringProcedure extends BaseProcedure
ProcedureParameter.optional(6, "order_strategy", DataTypes.StringType, None),
// params => key=value, key2=value2
ProcedureParameter.optional(7, "options", DataTypes.StringType, None),
- ProcedureParameter.optional(8, "instants", DataTypes.StringType, None)
+ ProcedureParameter.optional(8, "instants", DataTypes.StringType, None),
+ ProcedureParameter.optional(9, "selected_partitions", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -83,20 +84,26 @@ class RunClusteringProcedure extends BaseProcedure
val orderStrategy = getArgValueOrDefault(args, PARAMETERS(6))
val options = getArgValueOrDefault(args, PARAMETERS(7))
val instantsStr = getArgValueOrDefault(args, PARAMETERS(8))
+ val parts = getArgValueOrDefault(args, PARAMETERS(9))
val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
var conf: Map[String, String] = Map.empty
- predicate match {
- case Some(p) =>
- val prunedPartitions = prunePartition(metaClient, p.asInstanceOf[String])
- conf = conf ++ Map(
- HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() -> "SELECTED_PARTITIONS",
- HoodieClusteringConfig.PARTITION_SELECTED.key() -> prunedPartitions
- )
- logInfo(s"Partition predicates: $p, partition selected: $prunedPartitions")
- case _ =>
- logInfo("No partition predicates")
+
+ val selectedPartitions: String = (parts, predicate) match {
+ case (_, Some(p)) => prunePartition(metaClient, p.asInstanceOf[String])
+ case (Some(o), _) => o.asInstanceOf[String]
+ case _ => ""
+ }
+
+ if (selectedPartitions.nonEmpty) {
+ conf = conf ++ Map(
+ HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() -> "SELECTED_PARTITIONS",
+ HoodieClusteringConfig.PARTITION_SELECTED.key() -> selectedPartitions
+ )
+ logInfo(s"Partition selected: $selectedPartitions")
+ } else {
+ logInfo("No partition selected")
}
// Construct sort column info
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index c9fbb0f5d9f..b21dbcad70d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -605,7 +605,7 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
}
}
- test("Test Call run_clustering with partition selected config") {
+ test("Test Call run_clustering with partition selected") {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
@@ -631,9 +631,9 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011)")
- spark.sql(s"set ${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010")
// Do
- val result = spark.sql(s"call run_clustering(table => '$tableName', show_involved_partition => true)")
+ val result = spark.sql(s"call run_clustering(table => '$tableName', " +
+ s"selected_partitions => 'ts=1010', show_involved_partition => true)")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
assertResult(1)(result.length)
@@ -646,13 +646,13 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
)
}
- // Test clustering with PARTITION_SELECTED config set, choose all partitions to schedule
+ // Test clustering with PARTITION_SELECTED, choose all partitions to schedule
{
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010)")
spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011)")
spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012)")
- spark.sql(s"set ${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010,ts=1011,ts=1012")
- val result = spark.sql(s"call run_clustering(table => '$tableName', show_involved_partition => true)")
+ val result = spark.sql(s"call run_clustering(table => '$tableName', " +
+ s"selected_partitions => 'ts=1010,ts=1011,ts=1012', show_involved_partition => true)")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
assertResult(1)(result.length)