You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/08 13:32:46 UTC

[hudi] branch master updated: [MINOR] improve RunClusteringProcedure with partition selected (#7876)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b2e3805b3fb [MINOR] improve RunClusteringProcedure with partition selected (#7876)
b2e3805b3fb is described below

commit b2e3805b3fbacd3b3b223aa8615f8a1d7b714376
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Wed Feb 8 21:32:36 2023 +0800

    [MINOR] improve RunClusteringProcedure with partition selected (#7876)
---
 .../PartitionAwareClusteringPlanStrategy.java      |  2 ++
 .../procedures/RunClusteringProcedure.scala        | 31 +++++++++++++---------
 .../hudi/procedure/TestClusteringProcedure.scala   | 12 ++++-----
 3 files changed, 27 insertions(+), 18 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 915ccc2df03..38e1dcc5027 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -76,6 +76,7 @@ public abstract class PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
     HoodieWriteConfig config = getWriteConfig();
 
     String partitionSelected = config.getClusteringPartitionSelected();
+    LOG.info("Scheduling clustering partitionSelected: " + partitionSelected);
     List<String> partitionPaths;
 
     if (StringUtils.isNullOrEmpty(partitionSelected)) {
@@ -87,6 +88,7 @@ public abstract class PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
     }
 
     partitionPaths = filterPartitionPaths(partitionPaths);
+    LOG.info("Scheduling clustering partitionPaths: " + partitionPaths);
 
     if (partitionPaths.isEmpty()) {
       // In case no partitions could be picked, return no clustering plan
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index d34c0b0d7b7..449e03c2e48 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.command.procedures
 
 import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
 import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
 import org.apache.hudi.common.util.{ClusteringUtils, StringUtils, Option => HOption}
@@ -57,7 +57,8 @@ class RunClusteringProcedure extends BaseProcedure
     ProcedureParameter.optional(6, "order_strategy", DataTypes.StringType, None),
     // params => key=value, key2=value2
     ProcedureParameter.optional(7, "options", DataTypes.StringType, None),
-    ProcedureParameter.optional(8, "instants", DataTypes.StringType, None)
+    ProcedureParameter.optional(8, "instants", DataTypes.StringType, None),
+    ProcedureParameter.optional(9, "selected_partitions", DataTypes.StringType, None)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -83,20 +84,26 @@ class RunClusteringProcedure extends BaseProcedure
     val orderStrategy = getArgValueOrDefault(args, PARAMETERS(6))
     val options = getArgValueOrDefault(args, PARAMETERS(7))
     val instantsStr = getArgValueOrDefault(args, PARAMETERS(8))
+    val parts = getArgValueOrDefault(args, PARAMETERS(9))
 
     val basePath: String = getBasePath(tableName, tablePath)
     val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
     var conf: Map[String, String] = Map.empty
-    predicate match {
-      case Some(p) =>
-        val prunedPartitions = prunePartition(metaClient, p.asInstanceOf[String])
-        conf = conf ++ Map(
-          HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() -> "SELECTED_PARTITIONS",
-          HoodieClusteringConfig.PARTITION_SELECTED.key() -> prunedPartitions
-        )
-        logInfo(s"Partition predicates: $p, partition selected: $prunedPartitions")
-      case _ =>
-        logInfo("No partition predicates")
+
+    val selectedPartitions: String = (parts, predicate) match {
+      case (_, Some(p)) => prunePartition(metaClient, p.asInstanceOf[String])
+      case (Some(o), _) => o.asInstanceOf[String]
+      case _ => ""
+    }
+
+    if (selectedPartitions.nonEmpty) {
+      conf = conf ++ Map(
+        HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() -> "SELECTED_PARTITIONS",
+        HoodieClusteringConfig.PARTITION_SELECTED.key() -> selectedPartitions
+      )
+      logInfo(s"Partition selected: $selectedPartitions")
+    } else {
+      logInfo("No partition selected")
     }
 
     // Construct sort column info
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 3050a405138..96bd8ec480b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -605,7 +605,7 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
     }
   }
 
-  test("Test Call run_clustering with partition selected config") {
+  test("Test Call run_clustering with partition selected") {
     withTempDir { tmp =>
       val tableName = generateTableName
       val basePath = s"${tmp.getCanonicalPath}/$tableName"
@@ -631,9 +631,9 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
         spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)")
         spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)")
         spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011)")
-        spark.sql(s"set ${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010")
         // Do
-        val result = spark.sql(s"call run_clustering(table => '$tableName', show_involved_partition => true)")
+        val result = spark.sql(s"call run_clustering(table => '$tableName', " +
+          s"selected_partitions => 'ts=1010', show_involved_partition => true)")
           .collect()
           .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
         assertResult(1)(result.length)
@@ -646,13 +646,13 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
         )
       }
 
-      // Test clustering with PARTITION_SELECTED config set, choose all partitions to schedule
+      // Test clustering with PARTITION_SELECTED, choose all partitions to schedule
       {
         spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010)")
         spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011)")
         spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012)")
-        spark.sql(s"set ${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010,ts=1011,ts=1012")
-        val result = spark.sql(s"call run_clustering(table => '$tableName', show_involved_partition => true)")
+        val result = spark.sql(s"call run_clustering(table => '$tableName', " +
+          s"selected_partitions => 'ts=1010,ts=1011,ts=1012', show_involved_partition => true)")
           .collect()
           .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
         assertResult(1)(result.length)