You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2022/05/19 01:48:11 UTC

[hudi] branch master updated: [HUDI-4116] Unify clustering/compaction related procedures' output type (#5620)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6573469e73 [HUDI-4116] Unify clustering/compaction related procedures' output type (#5620)
6573469e73 is described below

commit 6573469e73ea51ed6d1c24504e8be5abfa91c642
Author: huberylee <sh...@foxmail.com>
AuthorDate: Thu May 19 09:48:03 2022 +0800

    [HUDI-4116] Unify clustering/compaction related procedures' output type (#5620)
    
    * Unify clustering/compaction related procedures' output type
    
    * Address review comments
---
 .../scala/org/apache/hudi/HoodieCLIUtils.scala     |  15 ++-
 .../hudi/command/CompactionHoodiePathCommand.scala |  11 +--
 .../command/CompactionHoodieTableCommand.scala     |  13 +--
 .../command/CompactionShowHoodiePathCommand.scala  |  12 +--
 .../command/CompactionShowHoodieTableCommand.scala |  12 +--
 .../procedures/RunClusteringProcedure.scala        |  34 ++++++-
 .../procedures/RunCompactionProcedure.scala        |  29 ++++--
 .../procedures/ShowClusteringProcedure.scala       |  37 ++++++--
 .../procedures/ShowCompactionProcedure.scala       |  16 ++--
 .../hudi/procedure/TestClusteringProcedure.scala   | 103 +++++++++++++++------
 .../hudi/procedure/TestCompactionProcedure.scala   |  78 ++++++++++++----
 11 files changed, 247 insertions(+), 113 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
index 58c3324823..552e3cfc9b 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
@@ -19,14 +19,14 @@
 
 package org.apache.hudi
 
+import org.apache.hudi.avro.model.HoodieClusteringGroup
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf
 
-import scala.collection.JavaConverters.mapAsJavaMapConverter
-import scala.collection.immutable.Map
+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter}
 
 object HoodieCLIUtils {
 
@@ -46,4 +46,15 @@ object HoodieCLIUtils {
     DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath,
       metaClient.getTableConfig.getTableName, finalParameters.asJava)
   }
+
+  def extractPartitions(clusteringGroups: Seq[HoodieClusteringGroup]): String = {
+    var partitionPaths: Seq[String] = Seq.empty
+    clusteringGroups.foreach(g =>
+      g.getSlices.asScala.foreach(slice =>
+        partitionPaths = partitionPaths :+ slice.getPartitionPath
+      )
+    )
+
+    partitionPaths.sorted.mkString(",")
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala
index 5b513f7500..57aff092b7 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala
@@ -19,11 +19,9 @@ package org.apache.spark.sql.hudi.command
 
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.HoodieTableMetaClient
-
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE}
 import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedureUtils, RunCompactionProcedure}
-import org.apache.spark.sql.types.StringType
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -50,10 +48,5 @@ case class CompactionHoodiePathCommand(path: String,
     RunCompactionProcedure.builder.get().build.call(procedureArgs)
   }
 
-  override val output: Seq[Attribute] = {
-    operation match {
-      case RUN => Seq.empty
-      case SCHEDULE => Seq(AttributeReference("instant", StringType, nullable = false)())
-    }
-  }
+  override val output: Seq[Attribute] = RunCompactionProcedure.builder.get().build.outputType.toAttributes
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala
index 5e362314c2..adaaeae9e5 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala
@@ -18,10 +18,10 @@
 package org.apache.spark.sql.hudi.command
 
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.CompactionOperation
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation
-import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.hudi.command.procedures.RunCompactionProcedure
 import org.apache.spark.sql.{Row, SparkSession}
 
 @Deprecated
@@ -35,10 +35,5 @@ case class CompactionHoodieTableCommand(table: CatalogTable,
     CompactionHoodiePathCommand(basePath, operation, instantTimestamp).run(sparkSession)
   }
 
-  override val output: Seq[Attribute] = {
-    operation match {
-      case RUN => Seq.empty
-      case SCHEDULE => Seq(AttributeReference("instant", StringType, nullable = false)())
-    }
-  }
+  override val output: Seq[Attribute] = RunCompactionProcedure.builder.get().build.outputType.toAttributes
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala
index 965724163b..95a4ecf780 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala
@@ -19,10 +19,8 @@ package org.apache.spark.sql.hudi.command
 
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.HoodieTableMetaClient
-
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedureUtils, ShowCompactionProcedure}
-import org.apache.spark.sql.types.{IntegerType, StringType}
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -42,11 +40,5 @@ case class CompactionShowHoodiePathCommand(path: String, limit: Int)
     ShowCompactionProcedure.builder.get().build.call(procedureArgs)
   }
 
-  override val output: Seq[Attribute] = {
-    Seq(
-      AttributeReference("instant", StringType, nullable = false)(),
-      AttributeReference("action", StringType, nullable = false)(),
-      AttributeReference("size", IntegerType, nullable = false)()
-    )
-  }
+  override val output: Seq[Attribute] = ShowCompactionProcedure.builder.get().build.outputType.toAttributes
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala
index f3f0a8e529..afd15d5153 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.sql.hudi.command
 
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation
-import org.apache.spark.sql.types.{IntegerType, StringType}
+import org.apache.spark.sql.hudi.command.procedures.ShowCompactionProcedure
 import org.apache.spark.sql.{Row, SparkSession}
 
 @Deprecated
@@ -32,11 +32,5 @@ case class CompactionShowHoodieTableCommand(table: CatalogTable, limit: Int)
     CompactionShowHoodiePathCommand(basePath, limit).run(sparkSession)
   }
 
-  override val output: Seq[Attribute] = {
-    Seq(
-      AttributeReference("timestamp", StringType, nullable = false)(),
-      AttributeReference("action", StringType, nullable = false)(),
-      AttributeReference("size", IntegerType, nullable = false)()
-    )
-  }
+  override val output: Seq[Attribute] = ShowCompactionProcedure.builder.get().build.outputType.toAttributes
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index 231d0939cc..b353aebe50 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hudi.command.procedures
 
 import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
 import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption}
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.datasources.FileStatusCache
 import org.apache.spark.sql.types._
 
 import java.util.function.Supplier
+
 import scala.collection.JavaConverters._
 
 class RunClusteringProcedure extends BaseProcedure
@@ -50,13 +51,15 @@ class RunClusteringProcedure extends BaseProcedure
     ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
     ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
     ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None),
-    ProcedureParameter.optional(3, "order", DataTypes.StringType, None)
+    ProcedureParameter.optional(3, "order", DataTypes.StringType, None),
+    ProcedureParameter.optional(4, "show_involved_partition", DataTypes.BooleanType, false)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
     StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
-    StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty),
-    StructField("groups", DataTypes.IntegerType, nullable = true, Metadata.empty)
+    StructField("input_group_size", DataTypes.IntegerType, nullable = true, Metadata.empty),
+    StructField("state", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("involved_partitions", DataTypes.StringType, nullable = true, Metadata.empty)
   ))
 
   def parameters: Array[ProcedureParameter] = PARAMETERS
@@ -70,6 +73,7 @@ class RunClusteringProcedure extends BaseProcedure
     val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
     val predicate = getArgValueOrDefault(args, PARAMETERS(2))
     val orderColumns = getArgValueOrDefault(args, PARAMETERS(3))
+    val showInvolvedPartitions = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean]
 
     val basePath: String = getBasePath(tableName, tablePath)
     val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
@@ -114,7 +118,27 @@ class RunClusteringProcedure extends BaseProcedure
     pendingClustering.foreach(client.cluster(_, true))
     logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," +
       s" time cost: ${System.currentTimeMillis() - startTs}ms.")
-    Seq.empty[Row]
+
+    val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
+      .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp))
+      .toSeq
+      .sortBy(f => f.getTimestamp)
+      .reverse
+
+    val clusteringPlans = clusteringInstants.map(instant =>
+      ClusteringUtils.getClusteringPlan(metaClient, instant)
+    )
+
+    if (showInvolvedPartitions) {
+      clusteringPlans.map { p =>
+        Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
+          p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala))
+      }
+    } else {
+      clusteringPlans.map { p =>
+        Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), p.get().getLeft.getState.name(), "*")
+      }
+    }
   }
 
   override def build: Procedure = new RunClusteringProcedure()
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
index 9bca33f388..3e5a7e29e4 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
@@ -20,10 +20,9 @@ package org.apache.spark.sql.hudi.command.procedures
 import org.apache.hudi.common.model.HoodieCommitMetadata
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
-import org.apache.hudi.common.util.{HoodieTimer, Option => HOption}
+import org.apache.hudi.common.util.{CompactionUtils, HoodieTimer, Option => HOption}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport}
-
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types._
@@ -47,7 +46,9 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
-    StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty)
+    StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("operation_size", DataTypes.IntegerType, nullable = true, Metadata.empty),
+    StructField("state", DataTypes.StringType, nullable = true, Metadata.empty)
   ))
 
   def parameters: Array[ProcedureParameter] = PARAMETERS
@@ -66,13 +67,12 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
     val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
     val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty)
 
+    var willCompactionInstants: Seq[String] = Seq.empty
     operation match {
       case "schedule" =>
         val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
         if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
-          Seq(Row(instantTime))
-        } else {
-          Seq.empty[Row]
+          willCompactionInstants = Seq(instantTime)
         }
       case "run" =>
         // Do compaction
@@ -81,7 +81,7 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
           .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
           .map(_.getTimestamp)
           .toSeq.sortBy(f => f)
-        val willCompactionInstants = if (instantTimestamp.isEmpty) {
+        willCompactionInstants = if (instantTimestamp.isEmpty) {
           if (pendingCompactionInstants.nonEmpty) {
             pendingCompactionInstants
           } else { // If there are no pending compaction, schedule to generate one.
@@ -102,9 +102,9 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
               s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ")
           }
         }
+
         if (willCompactionInstants.isEmpty) {
           logInfo(s"No need to compaction on $basePath")
-          Seq.empty[Row]
         } else {
           logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath")
           val timer = new HoodieTimer
@@ -116,10 +116,21 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
           }
           logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," +
             s" spend: ${timer.endTimer()}ms")
-          Seq.empty[Row]
         }
       case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
     }
+
+    val compactionInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
+      .filter(instant => willCompactionInstants.contains(instant.getTimestamp))
+      .toSeq
+      .sortBy(p => p.getTimestamp)
+      .reverse
+
+    compactionInstants.map(instant =>
+      (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))
+    ).map { case (instant, plan) =>
+      Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name())
+    }
   }
 
   private def handleResponse(metadata: HoodieCommitMetadata): Unit = {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
index a9d808217c..092610119e 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
@@ -17,26 +17,31 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
-import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport}
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.common.util.ClusteringUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types._
 
 import java.util.function.Supplier
+
 import scala.collection.JavaConverters._
 
 class ShowClusteringProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
     ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
     ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
-    ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20)
+    ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20),
+    ProcedureParameter.optional(3, "show_involved_partition", DataTypes.BooleanType, false)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
     StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
-    StructField("groups", DataTypes.IntegerType, nullable = true, Metadata.empty)
+    StructField("input_group_size", DataTypes.IntegerType, nullable = true, Metadata.empty),
+    StructField("state", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("involved_partitions", DataTypes.StringType, nullable = true, Metadata.empty)
   ))
 
   def parameters: Array[ProcedureParameter] = PARAMETERS
@@ -49,12 +54,32 @@ class ShowClusteringProcedure extends BaseProcedure with ProcedureBuilder with S
     val tableName = getArgValueOrDefault(args, PARAMETERS(0))
     val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
     val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+    val showInvolvedPartitions = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[Boolean]
 
     val basePath: String = getBasePath(tableName, tablePath)
     val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
-    ClusteringUtils.getAllPendingClusteringPlans(metaClient).iterator().asScala.map { p =>
-      Row(p.getLeft.getTimestamp, p.getRight.getInputGroups.size())
-    }.toSeq.take(limit)
+    val clusteringInstants = metaClient.getActiveTimeline.getInstants.iterator().asScala
+      .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
+      .toSeq
+      .sortBy(f => f.getTimestamp)
+      .reverse
+      .take(limit)
+
+    val clusteringPlans = clusteringInstants.map(instant =>
+      ClusteringUtils.getClusteringPlan(metaClient, instant)
+    )
+
+    if (showInvolvedPartitions) {
+      clusteringPlans.map { p =>
+        Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
+          p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala))
+      }
+    } else {
+      clusteringPlans.map { p =>
+        Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
+          p.get().getLeft.getState.name(), "*")
+      }
+    }
   }
 
   override def build: Procedure = new ShowClusteringProcedure()
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
index d484d65323..7a7bb2cf9d 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
@@ -44,8 +44,8 @@ class ShowCompactionProcedure extends BaseProcedure with ProcedureBuilder with S
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
     StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
-    StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
-    StructField("size", DataTypes.IntegerType, nullable = true, Metadata.empty)
+    StructField("operation_size", DataTypes.IntegerType, nullable = true, Metadata.empty),
+    StructField("state", DataTypes.StringType, nullable = true, Metadata.empty)
   ))
 
   def parameters: Array[ProcedureParameter] = PARAMETERS
@@ -64,17 +64,17 @@ class ShowCompactionProcedure extends BaseProcedure with ProcedureBuilder with S
 
     assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ,
       s"Cannot show compaction on a Non Merge On Read table.")
-    val timeLine = metaClient.getActiveTimeline
-    val compactionInstants = timeLine.getInstants.iterator().asScala
+    val compactionInstants = metaClient.getActiveTimeline.getInstants.iterator().asScala
       .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
       .toSeq
       .sortBy(f => f.getTimestamp)
       .reverse
       .take(limit)
-    val compactionPlans = compactionInstants.map(instant =>
-      (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp)))
-    compactionPlans.map { case (instant, plan) =>
-      Row(instant.getTimestamp, instant.getAction, plan.getOperations.size())
+
+    compactionInstants.map(instant =>
+      (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))
+    ).map { case (instant, plan) =>
+      Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name())
     }
   }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index f975651bd7..df4d8c90e2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -20,10 +20,9 @@
 package org.apache.spark.sql.hudi.procedure
 
 import org.apache.hadoop.fs.Path
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers}
-
 import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
 
 import scala.collection.JavaConverters.asScalaIteratorConverter
@@ -64,28 +63,22 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
         val secondScheduleInstant = HoodieActiveTimeline.createNewInstantTime
         client.scheduleClusteringAtInstant(secondScheduleInstant, HOption.empty())
         checkAnswer(s"call show_clustering('$tableName')")(
-          Seq(firstScheduleInstant, 3),
-          Seq(secondScheduleInstant, 1)
+          Seq(secondScheduleInstant, 1, HoodieInstant.State.REQUESTED.name(), "*"),
+          Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "*")
         )
 
         // Do clustering for all clustering plan generated above, and no new clustering
         // instant will be generated because of there is no commit after the second
         // clustering plan generated
-        spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')")
+        checkAnswer(s"call run_clustering(table => '$tableName', order => 'ts', show_involved_partition => true)")(
+          Seq(secondScheduleInstant, 1, HoodieInstant.State.COMPLETED.name(), "ts=1003"),
+          Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001,ts=1002")
+        )
 
         // No new commits
         val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
         assertResult(false)(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, secondScheduleInstant))
 
-        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-          Seq(1, "a1", 10.0, 1000),
-          Seq(2, "a2", 10.0, 1001),
-          Seq(3, "a3", 10.0, 1002),
-          Seq(4, "a4", 10.0, 1003)
-        )
-        // After clustering there should be no pending clustering.
-        checkAnswer(s"call show_clustering(table => '$tableName')")()
-
         // Check the number of finished clustering instants
         val finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
           .getInstants
@@ -94,10 +87,23 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
           .toSeq
         assertResult(2)(finishedClustering.size)
 
+        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+          Seq(1, "a1", 10.0, 1000),
+          Seq(2, "a2", 10.0, 1001),
+          Seq(3, "a3", 10.0, 1002),
+          Seq(4, "a4", 10.0, 1003)
+        )
+
+        // After clustering there should be no pending clustering and all clustering instants should be completed
+        checkAnswer(s"call show_clustering(table => '$tableName')")(
+          Seq(secondScheduleInstant, 1, HoodieInstant.State.COMPLETED.name(), "*"),
+          Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "*")
+        )
+
         // Do clustering without manual schedule(which will do the schedule if no pending clustering exists)
         spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
         spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)")
-        spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')")
+        spark.sql(s"call run_clustering(table => '$tableName', order => 'ts', show_involved_partition => true)").show()
 
         val thirdClusteringInstant = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
           .findInstantsAfter(secondScheduleInstant)
@@ -142,7 +148,7 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
              | location '$basePath'
        """.stripMargin)
 
-        spark.sql(s"call run_clustering(path => '$basePath')")
+        spark.sql(s"call run_clustering(path => '$basePath')").show()
         checkAnswer(s"call show_clustering(path => '$basePath')")()
 
         spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
@@ -152,18 +158,22 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
         // Generate the first clustering plan
         val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
         client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
-        checkAnswer(s"call show_clustering(path => '$basePath')")(
-          Seq(firstScheduleInstant, 3)
+        checkAnswer(s"call show_clustering(path => '$basePath', show_involved_partition => true)")(
+          Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "ts=1000,ts=1001,ts=1002")
         )
         // Do clustering for all the clustering plan
-        spark.sql(s"call run_clustering(path => '$basePath', order => 'ts')")
+        checkAnswer(s"call run_clustering(path => '$basePath', order => 'ts')")(
+          Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "*")
+        )
+
         checkAnswer(s"select id, name, price, ts from $tableName order by id")(
           Seq(1, "a1", 10.0, 1000),
           Seq(2, "a2", 10.0, 1001),
           Seq(3, "a3", 10.0, 1002)
         )
+
         val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
-        HoodieDataSourceHelpers.hasNewCommits(fs, basePath, firstScheduleInstant)
+        assertResult(false)(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, firstScheduleInstant))
 
         // Check the number of finished clustering instants
         var finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -176,7 +186,12 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
         // Do clustering without manual schedule(which will do the schedule if no pending clustering exists)
         spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
         spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
-        spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L')")
+        val resultA = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L', show_involved_partition => true)")
+          .collect()
+          .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
+        assertResult(1)(resultA.length)
+        assertResult("ts=1003,ts=1004")(resultA(0)(3))
+
         checkAnswer(s"select id, name, price, ts from $tableName order by id")(
           Seq(1, "a1", 10.0, 1000),
           Seq(2, "a2", 10.0, 1001),
@@ -220,6 +235,8 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
         val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
 
         // Test partition pruning with single predicate
+        var resultA: Array[Seq[Any]] = Array.empty
+
         {
           spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
           spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
@@ -230,7 +247,11 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
           )("Only partition predicates are allowed")
 
           // Do clustering table with partition predicate
-          spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts')")
+          resultA = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts', show_involved_partition => true)")
+            .collect()
+            .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
+          assertResult(1)(resultA.length)
+          assertResult("ts=1000,ts=1001")(resultA(0)(3))
 
           // There is 1 completed clustering instant
           val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -245,9 +266,12 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
           val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp)
           assertResult(true)(clusteringPlan.isPresent)
           assertResult(2)(clusteringPlan.get().getInputGroups.size())
+          assertResult(resultA(0)(1))(clusteringPlan.get().getInputGroups.size())
 
-          // No pending clustering instant
-          checkAnswer(s"call show_clustering(table => '$tableName')")()
+          // All clustering instants are completed
+          checkAnswer(s"call show_clustering(table => '$tableName', show_involved_partition => true)")(
+            Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001")
+          )
 
           checkAnswer(s"select id, name, price, ts from $tableName order by id")(
             Seq(1, "a1", 10.0, 1000),
@@ -257,6 +281,8 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
         }
 
         // Test partition pruning with {@code And} predicates
+        var resultB: Array[Seq[Any]] = Array.empty
+
         {
           spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
           spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
@@ -267,7 +293,11 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
           )("Only partition predicates are allowed")
 
           // Do clustering table with partition predicate
-          spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L', order => 'ts')")
+          resultB = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L', order => 'ts', show_involved_partition => true)")
+            .collect()
+            .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
+          assertResult(1)(resultB.length)
+          assertResult("ts=1002,ts=1003,ts=1004,ts=1005")(resultB(0)(3))
 
           // There are 2 completed clustering instants
           val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -283,8 +313,11 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
           assertResult(true)(clusteringPlan.isPresent)
           assertResult(4)(clusteringPlan.get().getInputGroups.size())
 
-          // No pending clustering instant
-          checkAnswer(s"call show_clustering(table => '$tableName')")()
+          // All clustering instants are completed
+          checkAnswer(s"call show_clustering(table => '$tableName', show_involved_partition => true)")(
+            Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001"),
+            Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1002,ts=1003,ts=1004,ts=1005")
+          )
 
           checkAnswer(s"select id, name, price, ts from $tableName order by id")(
             Seq(1, "a1", 10.0, 1000),
@@ -297,6 +330,8 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
         }
 
         // Test partition pruning with {@code And}-{@code Or} predicates
+        var resultC: Array[Seq[Any]] = Array.empty
+
         {
           spark.sql(s"insert into $tableName values(7, 'a7', 10, 1006)")
           spark.sql(s"insert into $tableName values(8, 'a8', 10, 1007)")
@@ -308,7 +343,11 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
           )("Only partition predicates are allowed")
 
           // Do clustering table with partition predicate
-          spark.sql(s"call run_clustering(table => '$tableName', predicate => '(ts >= 1006L and ts < 1008L) or ts >= 1009L', order => 'ts')")
+          resultC = spark.sql(s"call run_clustering(table => '$tableName', predicate => '(ts >= 1006L and ts < 1008L) or ts >= 1009L', order => 'ts', show_involved_partition => true)")
+            .collect()
+            .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
+          assertResult(1)(resultC.length)
+          assertResult("ts=1006,ts=1007,ts=1009")(resultC(0)(3))
 
           // There are 3 completed clustering instants
           val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -324,8 +363,12 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase {
           assertResult(true)(clusteringPlan.isPresent)
           assertResult(3)(clusteringPlan.get().getInputGroups.size())
 
-          // No pending clustering instant
-          checkAnswer(s"call show_clustering(table => '$tableName')")()
+          // All clustering instants are completed
+          checkAnswer(s"call show_clustering(table => '$tableName', show_involved_partition => true)")(
+            Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001"),
+            Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1002,ts=1003,ts=1004,ts=1005"),
+            Seq(resultC(0).head, resultC(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1006,ts=1007,ts=1009")
+          )
 
           checkAnswer(s"select id, name, price, ts from $tableName order by id")(
             Seq(1, "a1", 10.0, 1000),
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
index 0f6f96f911..39332d8591 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
@@ -19,6 +19,7 @@
 
 package org.apache.spark.sql.hudi.procedure
 
+import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
 
 class TestCompactionProcedure extends HoodieSparkSqlTestBase {
@@ -48,22 +49,52 @@ class TestCompactionProcedure extends HoodieSparkSqlTestBase {
       spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
       spark.sql(s"update $tableName set price = 11 where id = 1")
 
-      spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')")
+      // Schedule the first compaction
+      val resultA = spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')")
+        .collect()
+        .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2)))
+
       spark.sql(s"update $tableName set price = 12 where id = 2")
-      spark.sql(s"call run_compaction('schedule', '$tableName')")
-      val compactionRows = spark.sql(s"call show_compaction(table => '$tableName', limit => 10)").collect()
-      val timestamps = compactionRows.map(_.getString(0))
+
+      // Schedule the second compaction
+      val resultB = spark.sql(s"call run_compaction('schedule', '$tableName')")
+        .collect()
+        .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2)))
+
+      assertResult(1)(resultA.length)
+      assertResult(1)(resultB.length)
+      val showCompactionSql: String = s"call show_compaction(table => '$tableName', limit => 10)"
+      checkAnswer(showCompactionSql)(
+        resultA(0),
+        resultB(0)
+      )
+
+      val compactionRows = spark.sql(showCompactionSql).collect()
+      val timestamps = compactionRows.map(_.getString(0)).sorted
       assertResult(2)(timestamps.length)
 
-      spark.sql(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(1)})")
+      // Execute the second scheduled compaction instant actually
+      checkAnswer(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(1)})")(
+        Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name())
+      )
       checkAnswer(s"select id, name, price, ts from $tableName order by id")(
         Seq(1, "a1", 11.0, 1000),
         Seq(2, "a2", 12.0, 1000),
         Seq(3, "a3", 10.0, 1000),
         Seq(4, "a4", 10.0, 1000)
       )
-      assertResult(1)(spark.sql(s"call show_compaction('$tableName')").collect().length)
-      spark.sql(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(0)})")
+
+      // A compaction action eventually becomes commit when completed, so show_compaction
+      // can only see the first scheduled compaction instant
+      val resultC = spark.sql(s"call show_compaction('$tableName')")
+        .collect()
+        .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2)))
+      assertResult(1)(resultC.length)
+      assertResult(resultA)(resultC)
+
+      checkAnswer(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(0)})")(
+        Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name())
+      )
       checkAnswer(s"select id, name, price, ts from $tableName order by id")(
         Seq(1, "a1", 11.0, 1000),
         Seq(2, "a2", 12.0, 1000),
@@ -98,25 +129,40 @@ class TestCompactionProcedure extends HoodieSparkSqlTestBase {
       spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
       spark.sql(s"update $tableName set price = 11 where id = 1")
 
-      spark.sql(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')")
+      checkAnswer(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')")()
       checkAnswer(s"select id, name, price, ts from $tableName order by id")(
         Seq(1, "a1", 11.0, 1000),
         Seq(2, "a2", 10.0, 1000),
         Seq(3, "a3", 10.0, 1000)
       )
       assertResult(0)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length)
-      // schedule compaction first
+
       spark.sql(s"update $tableName set price = 12 where id = 1")
-      spark.sql(s"call run_compaction(op=> 'schedule', path => '${tmp.getCanonicalPath}')")
 
-      // schedule compaction second
+      // Schedule the first compaction
+      val resultA = spark.sql(s"call run_compaction(op=> 'schedule', path => '${tmp.getCanonicalPath}')")
+        .collect()
+        .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2)))
+
       spark.sql(s"update $tableName set price = 12 where id = 2")
-      spark.sql(s"call run_compaction(op => 'schedule', path => '${tmp.getCanonicalPath}')")
 
-      // show compaction
-      assertResult(2)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length)
-      // run compaction for all the scheduled compaction
-      spark.sql(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')")
+      // Schedule the second compaction
+      val resultB = spark.sql(s"call run_compaction(op => 'schedule', path => '${tmp.getCanonicalPath}')")
+        .collect()
+        .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2)))
+
+      assertResult(1)(resultA.length)
+      assertResult(1)(resultB.length)
+      checkAnswer(s"call show_compaction(path => '${tmp.getCanonicalPath}')")(
+        resultA(0),
+        resultB(0)
+      )
+
+      // Run compaction for all the scheduled compaction
+      checkAnswer(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')")(
+        Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name()),
+        Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name())
+      )
 
       checkAnswer(s"select id, name, price, ts from $tableName order by id")(
         Seq(1, "a1", 12.0, 1000),