You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/08 13:32:46 UTC
[hudi] branch master updated: [MINOR] improve RunClusteringProcedure with partition selected (#7876)
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b2e3805b3fb [MINOR] improve RunClusteringProcedure with partition selected (#7876)
b2e3805b3fb is described below
commit b2e3805b3fbacd3b3b223aa8615f8a1d7b714376
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Wed Feb 8 21:32:36 2023 +0800
[MINOR] improve RunClusteringProcedure with partition selected (#7876)
---
.../PartitionAwareClusteringPlanStrategy.java | 2 ++
.../procedures/RunClusteringProcedure.scala | 31 +++++++++++++---------
.../hudi/procedure/TestClusteringProcedure.scala | 12 ++++-----
3 files changed, 27 insertions(+), 18 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 915ccc2df03..38e1dcc5027 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -76,6 +76,7 @@ public abstract class PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
HoodieWriteConfig config = getWriteConfig();
String partitionSelected = config.getClusteringPartitionSelected();
+ LOG.info("Scheduling clustering partitionSelected: " + partitionSelected);
List<String> partitionPaths;
if (StringUtils.isNullOrEmpty(partitionSelected)) {
@@ -87,6 +88,7 @@ public abstract class PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
}
partitionPaths = filterPartitionPaths(partitionPaths);
+ LOG.info("Scheduling clustering partitionPaths: " + partitionPaths);
if (partitionPaths.isEmpty()) {
// In case no partitions could be picked, return no clustering plan
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index d34c0b0d7b7..449e03c2e48 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.common.util.{ClusteringUtils, StringUtils, Option => HOption}
@@ -57,7 +57,8 @@ class RunClusteringProcedure extends BaseProcedure
ProcedureParameter.optional(6, "order_strategy", DataTypes.StringType, None),
// params => key=value, key2=value2
ProcedureParameter.optional(7, "options", DataTypes.StringType, None),
- ProcedureParameter.optional(8, "instants", DataTypes.StringType, None)
+ ProcedureParameter.optional(8, "instants", DataTypes.StringType, None),
+ ProcedureParameter.optional(9, "selected_partitions", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -83,20 +84,26 @@ class RunClusteringProcedure extends BaseProcedure
val orderStrategy = getArgValueOrDefault(args, PARAMETERS(6))
val options = getArgValueOrDefault(args, PARAMETERS(7))
val instantsStr = getArgValueOrDefault(args, PARAMETERS(8))
+ val parts = getArgValueOrDefault(args, PARAMETERS(9))
val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
var conf: Map[String, String] = Map.empty
- predicate match {
- case Some(p) =>
- val prunedPartitions = prunePartition(metaClient, p.asInstanceOf[String])
- conf = conf ++ Map(
- HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() -> "SELECTED_PARTITIONS",
- HoodieClusteringConfig.PARTITION_SELECTED.key() -> prunedPartitions
- )
- logInfo(s"Partition predicates: $p, partition selected: $prunedPartitions")
- case _ =>
- logInfo("No partition predicates")
+
+ val selectedPartitions: String = (parts, predicate) match {
+ case (_, Some(p)) => prunePartition(metaClient, p.asInstanceOf[String])
+ case (Some(o), _) => o.asInstanceOf[String]
+ case _ => ""
+ }
+
+ if (selectedPartitions.nonEmpty) {
+ conf = conf ++ Map(
+ HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() -> "SELECTED_PARTITIONS",
+ HoodieClusteringConfig.PARTITION_SELECTED.key() -> selectedPartitions
+ )
+ logInfo(s"Partition selected: $selectedPartitions")
+ } else {
+ logInfo("No partition selected")
}
// Construct sort column info
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 3050a405138..96bd8ec480b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -605,7 +605,7 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
}
}
- test("Test Call run_clustering with partition selected config") {
+ test("Test Call run_clustering with partition selected") {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
@@ -631,9 +631,9 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011)")
- spark.sql(s"set ${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010")
// Do
- val result = spark.sql(s"call run_clustering(table => '$tableName', show_involved_partition => true)")
+ val result = spark.sql(s"call run_clustering(table => '$tableName', " +
+ s"selected_partitions => 'ts=1010', show_involved_partition => true)")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
assertResult(1)(result.length)
@@ -646,13 +646,13 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
)
}
- // Test clustering with PARTITION_SELECTED config set, choose all partitions to schedule
+ // Test clustering with PARTITION_SELECTED, choose all partitions to schedule
{
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010)")
spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011)")
spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012)")
- spark.sql(s"set ${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010,ts=1011,ts=1012")
- val result = spark.sql(s"call run_clustering(table => '$tableName', show_involved_partition => true)")
+ val result = spark.sql(s"call run_clustering(table => '$tableName', " +
+ s"selected_partitions => 'ts=1010,ts=1011,ts=1012', show_involved_partition => true)")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
assertResult(1)(result.length)