You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/02/14 03:13:31 UTC

[hudi] branch master updated: [HUDI-3370] The files recorded in the commit may not match the actual ones for MOR Compaction (#4753)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 76e2faa  [HUDI-3370] The files recorded in the commit may not match the actual ones for MOR Compaction (#4753)
76e2faa is described below

commit 76e2faa28dc1050e9e6ebe5b33dac1972b781eba
Author: YueZhang <69...@users.noreply.github.com>
AuthorDate: Mon Feb 14 11:12:52 2022 +0800

    [HUDI-3370] The files recorded in the commit may not match the actual ones for MOR Compaction (#4753)
    
    * use HoodieCommitMetadata to replace writeStatuses computation
    
    Co-authored-by: yuezhang <yu...@freewheel.tv>
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 13 +++++-----
 .../apache/hudi/client/HoodieFlinkWriteClient.java | 26 ++++++-------------
 .../apache/hudi/client/HoodieJavaWriteClient.java  |  8 +++---
 .../apache/hudi/client/HoodieSparkCompactor.java   | 15 ++++++-----
 .../apache/hudi/client/SparkRDDWriteClient.java    | 29 +++++++++-------------
 .../hudi/client/TestHoodieClientMultiWriter.java   |  5 ++--
 .../hudi/table/TestHoodieMergeOnReadTable.java     | 11 ++++----
 ...dieSparkMergeOnReadTableInsertUpdateDelete.java | 12 ++++++---
 .../TestHoodieSparkMergeOnReadTableRollback.java   | 28 +++++++++++++--------
 .../hudi/common/model/HoodieCommitMetadata.java    |  6 +++++
 .../examples/spark/HoodieWriteClientExample.java   |  6 +++--
 .../hudi/sink/compact/CompactionCommitSink.java    |  8 +++++-
 .../integ/testsuite/HoodieTestSuiteWriter.java     | 13 ++++++++--
 .../hudi/command/CompactionHoodiePathCommand.scala | 25 ++++++++-----------
 .../apache/hudi/utilities/HoodieClusteringJob.java | 19 ++------------
 .../org/apache/hudi/utilities/HoodieCompactor.java |  5 ++--
 .../org/apache/hudi/utilities/UtilHelpers.java     | 14 +++++++++++
 17 files changed, 129 insertions(+), 114 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 2414a9f..14e71f1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -883,7 +883,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
    * @param compactionInstantTime Compaction Instant Time
    * @return Collection of WriteStatus to inspect errors and counts
    */
-  public O compact(String compactionInstantTime) {
+  public HoodieWriteMetadata<O> compact(String compactionInstantTime) {
     return compact(compactionInstantTime, config.shouldAutoCommit());
   }
 
@@ -891,17 +891,16 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
    * Commit a compaction operation. Allow passing additional meta-data to be stored in commit instant file.
    *
    * @param compactionInstantTime Compaction Instant Time
-   * @param writeStatuses Collection of WriteStatus to inspect errors and counts
+   * @param metadata All the metadata that gets stored along with a commit
    * @param extraMetadata Extra Metadata to be stored
    */
-  public abstract void commitCompaction(String compactionInstantTime, O writeStatuses,
-                                        Option<Map<String, String>> extraMetadata) throws IOException;
+  public abstract void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata,
+                                        Option<Map<String, String>> extraMetadata);
 
   /**
    * Commit Compaction and track metrics.
    */
-  protected abstract void completeCompaction(HoodieCommitMetadata metadata, O writeStatuses,
-                                             HoodieTable<T, I, K, O> table, String compactionCommitTime);
+  protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable<T, I, K, O> table, String compactionCommitTime);
 
   /**
    * Get inflight time line exclude compaction and clustering.
@@ -1023,7 +1022,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
    * @param compactionInstantTime Compaction Instant Time
    * @return Collection of Write Status
    */
-  protected abstract O compact(String compactionInstantTime, boolean shouldComplete);
+  protected abstract HoodieWriteMetadata<O> compact(String compactionInstantTime, boolean shouldComplete);
 
   /**
    * Performs a compaction operation on a table, serially before or after an insert/upsert action.
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 50477c3..c3d977e 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -39,7 +39,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.FlinkHoodieIndexFactory;
 import org.apache.hudi.index.HoodieIndex;
@@ -68,7 +67,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.text.ParseException;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -346,23 +344,20 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   @Override
   public void commitCompaction(
       String compactionInstantTime,
-      List<WriteStatus> writeStatuses,
-      Option<Map<String, String>> extraMetadata) throws IOException {
+      HoodieCommitMetadata metadata,
+      Option<Map<String, String>> extraMetadata) {
     HoodieFlinkTable<T> table = getHoodieTable();
-    HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(
-        table, compactionInstantTime, HoodieList.of(writeStatuses), config.getSchema());
     extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
-    completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
+    completeCompaction(metadata, table, compactionInstantTime);
   }
 
   @Override
   public void completeCompaction(
       HoodieCommitMetadata metadata,
-      List<WriteStatus> writeStatuses,
       HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
       String compactionCommitTime) {
     this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
-    List<HoodieWriteStat> writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
     final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
     try {
       this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
@@ -391,16 +386,11 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   }
 
   @Override
-  protected List<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
+  protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
     // only used for metadata table, the compaction happens in single thread
-    try {
-      List<WriteStatus> writeStatuses =
-          getHoodieTable().compact(context, compactionInstantTime).getWriteStatuses();
-      commitCompaction(compactionInstantTime, writeStatuses, Option.empty());
-      return writeStatuses;
-    } catch (IOException e) {
-      throw new HoodieException("Error while compacting instant: " + compactionInstantTime);
-    }
+    HoodieWriteMetadata<List<WriteStatus>> compactionMetadata = getHoodieTable().compact(context, compactionInstantTime);
+    commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
+    return compactionMetadata;
   }
 
   @Override
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 7af24c8..f365f29 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -44,7 +44,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 import com.codahale.metrics.Timer;
 import org.apache.hadoop.conf.Configuration;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -210,21 +209,20 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
 
   @Override
   public void commitCompaction(String compactionInstantTime,
-                               List<WriteStatus> writeStatuses,
-                               Option<Map<String, String>> extraMetadata) throws IOException {
+                               HoodieCommitMetadata metadata,
+                               Option<Map<String, String>> extraMetadata) {
     throw new HoodieNotSupportedException("CommitCompaction is not supported in HoodieJavaClient");
   }
 
   @Override
   protected void completeCompaction(HoodieCommitMetadata metadata,
-                                    List<WriteStatus> writeStatuses,
                                     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
                                     String compactionCommitTime) {
     throw new HoodieNotSupportedException("CompleteCompaction is not supported in HoodieJavaClient");
   }
 
   @Override
-  protected List<WriteStatus> compact(String compactionInstantTime,
+  protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime,
                                       boolean shouldComplete) {
     throw new HoodieNotSupportedException("Compact is not supported in HoodieJavaClient");
   }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java
index ca5684a..b3dc27b 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java
@@ -22,14 +22,17 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 
-import java.io.IOException;
+import java.util.List;
 
 public class HoodieSparkCompactor<T extends HoodieRecordPayload> extends BaseCompactor<T,
     JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
@@ -43,12 +46,12 @@ public class HoodieSparkCompactor<T extends HoodieRecordPayload> extends BaseCom
   }
 
   @Override
-  public void compact(HoodieInstant instant) throws IOException {
+  public void compact(HoodieInstant instant) {
     LOG.info("Compactor executing compaction " + instant);
     SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>) compactionClient;
-    JavaRDD<WriteStatus> res = writeClient.compact(instant.getTimestamp());
-    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status");
-    long numWriteErrors = res.collect().stream().filter(WriteStatus::hasErrors).count();
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeClient.compact(instant.getTimestamp());
+    List<HoodieWriteStat> writeStats = compactionMetadata.getCommitMetadata().get().getWriteStats();
+    long numWriteErrors = writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
     if (numWriteErrors != 0) {
       // We treat even a single error in compaction as fatal
       LOG.error("Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors);
@@ -56,6 +59,6 @@ public class HoodieSparkCompactor<T extends HoodieRecordPayload> extends BaseCom
           "Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors);
     }
     // Commit compaction
-    writeClient.commitCompaction(instant.getTimestamp(), res, Option.empty());
+    writeClient.commitCompaction(instant.getTimestamp(), compactionMetadata.getCommitMetadata().get(), Option.empty());
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 63f8804..2fb27fe 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -65,7 +65,6 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.List;
@@ -286,20 +285,18 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
   }
 
   @Override
-  public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata) throws IOException {
+  public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
     HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
-    HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(
-        table, compactionInstantTime, HoodieJavaRDD.of(writeStatuses), config.getSchema());
     extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
-    completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
+    completeCompaction(metadata, table, compactionInstantTime);
   }
 
   @Override
-  protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
+  protected void completeCompaction(HoodieCommitMetadata metadata,
                                     HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
                                     String compactionCommitTime) {
     this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
-    List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
+    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
     final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
     try {
       this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
@@ -327,7 +324,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
   }
 
   @Override
-  protected JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
+  protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
     HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, true);
     preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient());
     HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
@@ -339,11 +336,10 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
     compactionTimer = metrics.getCompactionCtx();
     HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =
         table.compact(context, compactionInstantTime);
-    JavaRDD<WriteStatus> statuses = compactionMetadata.getWriteStatuses();
     if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
-      completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), statuses, table, compactionInstantTime);
+      completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), table, compactionInstantTime);
     }
-    return statuses;
+    return compactionMetadata;
   }
 
   @Override
@@ -359,15 +355,14 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
     clusteringTimer = metrics.getClusteringCtx();
     LOG.info("Starting clustering at " + clusteringInstant);
     HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = table.cluster(context, clusteringInstant);
-    JavaRDD<WriteStatus> statuses = clusteringMetadata.getWriteStatuses();
     // TODO : Where is shouldComplete used ?
     if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
-      completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), statuses, table, clusteringInstant);
+      completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
     }
     return clusteringMetadata;
   }
 
-  private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
+  private void completeClustering(HoodieReplaceCommitMetadata metadata,
                                     HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
                                     String clusteringCommitTime) {
 
@@ -469,16 +464,16 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
   }
 
   // TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy
-  private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
+  private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata,
                                     HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
                                     String commitInstant) {
 
     switch (tableServiceType) {
       case CLUSTER:
-        completeClustering((HoodieReplaceCommitMetadata) metadata, writeStatuses, table, commitInstant);
+        completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant);
         break;
       case COMPACT:
-        completeCompaction(metadata, writeStatuses, table, commitInstant);
+        completeCompaction(metadata, table, commitInstant);
         break;
       default:
         throw new IllegalArgumentException("This table service is not valid " + tableServiceType);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 035799c..c3cde74 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -38,6 +38,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 
 import org.apache.hadoop.fs.Path;
@@ -364,8 +365,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
       latchCountDownAndWait(runCountDownLatch, 30000);
       if (tableType == HoodieTableType.MERGE_ON_READ) {
         assertDoesNotThrow(() -> {
-          JavaRDD<WriteStatus> writeStatusJavaRDD = (JavaRDD<WriteStatus>) client2.compact("005");
-          client2.commitCompaction("005", writeStatusJavaRDD, Option.empty());
+          HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =  client2.compact("005");
+          client2.commitCompaction("005", compactionMetadata.getCommitMetadata().get(), Option.empty());
           validInstants.add("005");
         });
       }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index fb484f4..e6df537 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -45,6 +45,7 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor;
 import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
@@ -258,7 +259,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
 
       // Do a compaction
       String compactionInstantTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
-      JavaRDD<WriteStatus> result = (JavaRDD<WriteStatus>) writeClient.compact(compactionInstantTime);
+      HoodieWriteMetadata<JavaRDD<WriteStatus>> result = writeClient.compact(compactionInstantTime);
 
       // Verify that recently written compacted data file has no log file
       metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -275,8 +276,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
         for (FileSlice slice : groupedLogFiles) {
           assertEquals(0, slice.getLogFiles().count(), "After compaction there should be no log files visible on a full view");
         }
-        List<WriteStatus> writeStatuses = result.collect();
-        assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
+        assertTrue(result.getCommitMetadata().get().getWritePartitionPaths().stream().anyMatch(part -> part.contentEquals(partitionPath)));
       }
 
       // Check the entire dataset has all records still
@@ -442,8 +442,9 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
       // Test small file handling after compaction
       instantTime = "002";
       client.scheduleCompactionAtInstant(instantTime, Option.of(metadata.getExtraMetadata()));
-      statuses = (JavaRDD<WriteStatus>) client.compact(instantTime);
-      client.commitCompaction(instantTime, statuses, Option.empty());
+      HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = client.compact(instantTime);
+      statuses = compactionMetadata.getWriteStatuses();
+      client.commitCompaction(instantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
 
       // Read from commit file
       table = HoodieSparkTable.create(cfg, context());
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index 63f6e46..2955147 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -41,6 +42,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
 import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
@@ -56,6 +58,7 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 import java.util.stream.Collectors;
@@ -307,11 +310,12 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
       assertTrue(numLogFiles > 0);
       // Do a compaction
       String instantTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
-      statuses = (JavaRDD<WriteStatus>) writeClient.compact(instantTime);
+      HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeClient.compact(instantTime);
       String extension = table.getBaseFileExtension();
-      assertEquals(numLogFiles, statuses.map(status -> status.getStat().getPath().contains(extension)).count());
-      assertEquals(numLogFiles, statuses.count());
-      writeClient.commitCompaction(instantTime, statuses, Option.empty());
+      Collection<List<HoodieWriteStat>> stats = compactionMetadata.getCommitMetadata().get().getPartitionToWriteStats().values();
+      assertEquals(numLogFiles, stats.stream().flatMap(Collection::stream).filter(state -> state.getPath().contains(extension)).count());
+      assertEquals(numLogFiles, stats.stream().mapToLong(Collection::size).sum());
+      writeClient.commitCompaction(instantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
     }
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index f5f9fb8..47e9935 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.marker.MarkerType;
@@ -50,6 +51,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 
@@ -455,8 +457,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
 
       compactionInstantTime = "006";
       client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
-      JavaRDD<WriteStatus> ws = (JavaRDD<WriteStatus>) client.compact(compactionInstantTime);
-      client.commitCompaction(compactionInstantTime, ws, Option.empty());
+      HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = client.compact(compactionInstantTime);
+      client.commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
 
       allFiles = listAllBaseFilesInPath(hoodieTable);
       metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -543,8 +545,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
         metaClient = HoodieTableMetaClient.reload(metaClient);
         String compactionInstantTime = "005";
         client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
-        JavaRDD<WriteStatus> ws = (JavaRDD<WriteStatus>) client.compact(compactionInstantTime);
-        client.commitCompaction(compactionInstantTime, ws, Option.empty());
+        HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = client.compact(compactionInstantTime);
+        client.commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
 
         validateRecords(cfg, metaClient, updates3);
         List<HoodieRecord> updates4 = updateAndGetRecords("006", client, dataGen, records);
@@ -755,11 +757,14 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
       assertTrue(numLogFiles > 0);
       // Do a compaction
       newCommitTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
-      statuses = (JavaRDD<WriteStatus>) writeClient.compact(newCommitTime);
+      HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeClient.compact(newCommitTime);
+      statuses = compactionMetadata.getWriteStatuses();
       // Ensure all log files have been compacted into base files
       String extension = table.getBaseFileExtension();
-      assertEquals(numLogFiles, statuses.map(status -> status.getStat().getPath().contains(extension)).count());
-      assertEquals(numLogFiles, statuses.count());
+      Collection<List<HoodieWriteStat>> stats = compactionMetadata.getCommitMetadata().get().getPartitionToWriteStats().values();
+      assertEquals(numLogFiles, stats.stream().flatMap(Collection::stream).filter(state -> state.getPath().contains(extension)).count());
+      assertEquals(numLogFiles, stats.stream().mapToLong(Collection::size).sum());
+
       //writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
       // Trigger a rollback of compaction
       table.getActiveTimeline().reload();
@@ -862,14 +867,15 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
   private long doCompaction(SparkRDDWriteClient client, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, long numLogFiles) throws IOException {
     // Do a compaction
     String instantTime = client.scheduleCompaction(Option.empty()).get().toString();
-    JavaRDD<WriteStatus> writeStatuses = (JavaRDD<WriteStatus>) client.compact(instantTime);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = client.compact(instantTime);
 
     metaClient.reloadActiveTimeline();
     HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient);
     String extension = table.getBaseFileExtension();
-    assertEquals(numLogFiles, writeStatuses.map(status -> status.getStat().getPath().contains(extension)).count());
-    assertEquals(numLogFiles, writeStatuses.count());
-    client.commitCompaction(instantTime, writeStatuses, Option.empty());
+    Collection<List<HoodieWriteStat>> stats = compactionMetadata.getCommitMetadata().get().getPartitionToWriteStats().values();
+    assertEquals(numLogFiles, stats.stream().flatMap(Collection::stream).filter(state -> state.getPath().contains(extension)).count());
+    assertEquals(numLogFiles, stats.stream().mapToLong(Collection::size).sum());
+    client.commitCompaction(instantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
     return numLogFiles;
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index d693d91..594c1ad 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -35,10 +35,12 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * All the metadata that gets stored along with a commit.
@@ -89,6 +91,10 @@ public class HoodieCommitMetadata implements Serializable {
     return partitionToWriteStats;
   }
 
+  public List<HoodieWriteStat> getWriteStats() {
+    return partitionToWriteStats.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+  }
+
   public String getMetadata(String metaKey) {
     return extraMetadata.get(metaKey);
   }
diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
index 35e4660..1afc180 100644
--- a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
+++ b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
@@ -38,6 +38,8 @@ import org.apache.hudi.index.HoodieIndex;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
@@ -140,8 +142,8 @@ public class HoodieWriteClientExample {
       // compaction
       if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) {
         Option<String> instant = client.scheduleCompaction(Option.empty());
-        JavaRDD<WriteStatus> writeStatues = client.compact(instant.get());
-        client.commitCompaction(instant.get(), writeStatues, Option.empty());
+        HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = client.compact(instant.get());
+        client.commitCompaction(instant.get(), compactionMetadata.getCommitMetadata().get(), Option.empty());
       }
 
     }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 5312735..ecd6693 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -20,12 +20,15 @@ package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieList;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.CleanFunction;
 import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.action.compact.CompactHelpers;
 import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -147,8 +150,11 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
 
+    HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(
+        table, instant, HoodieList.of(statuses), writeClient.getConfig().getSchema());
+
     // commit the compaction
-    this.writeClient.commitCompaction(instant, statuses, Option.empty());
+    this.writeClient.commitCompaction(instant, metadata, Option.empty());
 
     // Whether to clean up the old log file when compaction
     if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
index e46343c..a98c7f2 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
@@ -24,7 +24,9 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.util.Option;
@@ -33,6 +35,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
 import org.apache.hudi.integ.testsuite.dag.nodes.CleanNode;
@@ -40,6 +43,9 @@ import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
 import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
 import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode;
 import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.compact.CompactHelpers;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
 import org.apache.avro.Schema;
@@ -215,7 +221,8 @@ public class HoodieTestSuiteWriter implements Serializable {
         }
       }
       if (instantTime.isPresent()) {
-        return (JavaRDD<WriteStatus>) writeClient.compact(instantTime.get());
+        HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeClient.compact(instantTime.get());
+        return compactionMetadata.getWriteStatuses();
       } else {
         return null;
       }
@@ -272,7 +279,9 @@ public class HoodieTestSuiteWriter implements Serializable {
         // Just stores the path where this batch of data is generated to
         extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0));
       }
-      writeClient.commitCompaction(instantTime.get(), records, Option.of(extraMetadata));
+      HoodieSparkTable<HoodieRecordPayload> table = HoodieSparkTable.create(writeClient.getConfig(), writeClient.getEngineContext());
+      HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(table, instantTime.get(), HoodieJavaRDD.of(records), writeClient.getConfig().getSchema());
+      writeClient.commitCompaction(instantTime.get(), metadata, Option.of(extraMetadata));
     }
   }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala
index 1363fb9..2f5c4d0 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala
@@ -17,14 +17,13 @@
 
 package org.apache.spark.sql.hudi.command
 
-import org.apache.hudi.client.WriteStatus
-import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieTableType}
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.util.{HoodieTimer, Option => HOption}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.{DataSourceUtils, DataSourceWriteOptions, HoodieWriterUtils}
-import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation
 import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE}
@@ -100,8 +99,8 @@ case class CompactionHoodiePathCommand(path: String,
           timer.startTimer()
           willCompactionInstants.foreach {compactionInstant =>
             val writeResponse = client.compact(compactionInstant)
-            handlerResponse(writeResponse)
-            client.commitCompaction(compactionInstant, writeResponse, HOption.empty())
+            handleResponse(writeResponse.getCommitMetadata.get())
+            client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty())
           }
           logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," +
             s" spend: ${timer.endTimer()}ms")
@@ -111,17 +110,13 @@ case class CompactionHoodiePathCommand(path: String,
     }
   }
 
-  private def handlerResponse(writeResponse: JavaRDD[WriteStatus]): Unit = {
+  private def handleResponse(metadata: HoodieCommitMetadata): Unit = {
+
     // Handle error
-    val error = writeResponse.rdd.filter(f => f.hasErrors).take(1).headOption
-    if (error.isDefined) {
-      if (error.get.hasGlobalError) {
-        throw error.get.getGlobalError
-      } else if (!error.get.getErrors.isEmpty) {
-        val key = error.get.getErrors.asScala.head._1
-        val exception = error.get.getErrors.asScala.head._2
-        throw new HoodieException(s"Error in write record: $key", exception)
-      }
+    val writeStats = metadata.getPartitionToWriteStats.entrySet().flatMap(e => e.getValue).toList
+    val errorsCount = writeStats.map(state => state.getTotalWriteErrors).sum
+    if (errorsCount > 0) {
+      throw new HoodieException(s" Found $errorsCount when writing record")
     }
   }
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index b7345a1..2663962 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -49,7 +48,6 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-import java.util.stream.Collectors;
 
 public class HoodieClusteringJob {
 
@@ -216,7 +214,7 @@ public class HoodieClusteringJob {
       }
       Option<HoodieCommitMetadata> commitMetadata = client.cluster(cfg.clusteringInstantTime, true).getCommitMetadata();
 
-      return handleErrors(commitMetadata.get(), cfg.clusteringInstantTime);
+      return UtilHelpers.handleErrors(commitMetadata.get(), cfg.clusteringInstantTime);
     }
   }
 
@@ -271,20 +269,7 @@ public class HoodieClusteringJob {
       LOG.info("The schedule instant time is " + instantTime.get());
       LOG.info("Step 2: Do cluster");
       Option<HoodieCommitMetadata> metadata = client.cluster(instantTime.get(), true).getCommitMetadata();
-      return handleErrors(metadata.get(), instantTime.get());
+      return UtilHelpers.handleErrors(metadata.get(), instantTime.get());
     }
   }
-
-  private int handleErrors(HoodieCommitMetadata metadata, String instantTime) {
-    List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
-        e.getValue().stream()).collect(Collectors.toList());
-    long errorsCount = writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
-    if (errorsCount == 0) {
-      LOG.info(String.format("Table imported into hoodie with %s instant time.", instantTime));
-      return 0;
-    }
-
-    LOG.error(String.format("Import failed with %d errors.", errorsCount));
-    return -1;
-  }
 }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index 5658575..ce2be7d 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.exception.HoodieException;
 
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -266,8 +267,8 @@ public class HoodieCompactor {
           throw new HoodieCompactionException("There is no scheduled compaction in the table.");
         }
       }
-      JavaRDD<WriteStatus> writeResponse = client.compact(cfg.compactionInstantTime);
-      return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse);
+      HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = client.compact(cfg.compactionInstantTime);
+      return UtilHelpers.handleErrors(compactionMetadata.getCommitMetadata().get(), cfg.compactionInstantTime);
     }
   }
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index b9eda63..8690ff1 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -26,7 +26,9 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.Functions.Function1;
@@ -303,6 +305,18 @@ public class UtilHelpers {
     return -1;
   }
 
+  public static int handleErrors(HoodieCommitMetadata metadata, String instantTime) {
+    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+    long errorsCount = writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
+    if (errorsCount == 0) {
+      LOG.info(String.format("Finish job with %s instant time.", instantTime));
+      return 0;
+    }
+
+    LOG.error(String.format("Job failed with %d errors.", errorsCount));
+    return -1;
+  }
+
   /**
    * Returns a factory for creating connections to the given JDBC URL.
    *