You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/10/25 02:34:14 UTC

[incubator-hudi] branch master updated: [HUDI-169] Speed up rolling back of instants (#968)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c23da69  [HUDI-169] Speed up rolling back of instants (#968)
c23da69 is described below

commit c23da694ccbbf1f0ebcc5ed770dd56c74c1e06d7
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Thu Oct 24 19:34:00 2019 -0700

    [HUDI-169] Speed up rolling back of instants (#968)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  13 +
 .../apache/hudi/table/HoodieCopyOnWriteTable.java  |  70 ++---
 .../apache/hudi/table/HoodieMergeOnReadTable.java  | 328 +++++++++------------
 .../org/apache/hudi/table/RollbackExecutor.java    | 233 +++++++++++++++
 .../org/apache/hudi/table/RollbackRequest.java     | 109 +++++++
 5 files changed, 506 insertions(+), 247 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 3597556..16b223c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -51,6 +51,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
   private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
   private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
+  private static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
+  private static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
   private static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
   private static final String DEFAULT_WRITE_BUFFER_LIMIT_BYTES = String.valueOf(4 * 1024 * 1024);
   private static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert";
@@ -141,6 +143,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Integer.parseInt(props.getProperty(UPSERT_PARALLELISM));
   }
 
+  public int getRollbackParallelism() {
+    return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
+  }
+
+
   public int getWriteBufferLimitBytes() {
     return Integer.parseInt(props.getProperty(WRITE_BUFFER_LIMIT_BYTES, DEFAULT_WRITE_BUFFER_LIMIT_BYTES));
   }
@@ -562,6 +569,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withRollbackParallelism(int rollbackParallelism) {
+      props.setProperty(ROLLBACK_PARALLELISM, String.valueOf(rollbackParallelism));
+      return this;
+    }
+
     public Builder withWriteBufferLimitBytes(int writeBufferLimit) {
       props.setProperty(WRITE_BUFFER_LIMIT_BYTES, String.valueOf(writeBufferLimit));
       return this;
@@ -651,6 +663,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       setDefaultOnCondition(props, !props.containsKey(BULKINSERT_PARALLELISM), BULKINSERT_PARALLELISM,
           DEFAULT_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
+      setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM, DEFAULT_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP), COMBINE_BEFORE_INSERT_PROP,
           DEFAULT_COMBINE_BEFORE_INSERT);
       setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_UPSERT_PROP), COMBINE_BEFORE_UPSERT_PROP,
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 6644f14..e18578d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -32,10 +32,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hudi.WriteStatus;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.HoodieCleanStat;
@@ -74,7 +72,6 @@ import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import scala.Tuple2;
@@ -294,45 +291,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
     }
   }
 
-  /**
-   * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits
-   */
-  protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, String partitionPath,
-      PathFilter filter) throws IOException {
-    logger.info("Cleaning path " + partitionPath);
-    FileSystem fs = getMetaClient().getFs();
-    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
-    for (FileStatus file : toBeDeleted) {
-      boolean success = fs.delete(file.getPath(), false);
-      results.put(file, success);
-      logger.info("Delete file " + file.getPath() + "\t" + success);
-    }
-    return results;
-  }
-
-  /**
-   * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits
-   */
-  protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, String commit,
-      String partitionPath) throws IOException {
-    logger.info("Cleaning path " + partitionPath);
-    FileSystem fs = getMetaClient().getFs();
-    PathFilter filter = (path) -> {
-      if (path.toString().contains(".parquet")) {
-        String fileCommitTime = FSUtils.getCommitTime(path.getName());
-        return commit.equals(fileCommitTime);
-      }
-      return false;
-    };
-    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
-    for (FileStatus file : toBeDeleted) {
-      boolean success = fs.delete(file.getPath(), false);
-      results.put(file, success);
-      logger.info("Delete file " + file.getPath() + "\t" + success);
-    }
-    return results;
-  }
-
   @Override
   public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, boolean deleteInstants)
       throws IOException {
@@ -342,30 +300,38 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
         this.getInflightCommitTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
     // Atomically unpublish the commits
     if (!inflights.contains(commit)) {
+      logger.info("Unpublishing " + commit);
       activeTimeline.revertToInflight(new HoodieInstant(false, actionType, commit));
     }
-    logger.info("Unpublished " + commit);
+
+    HoodieInstant instantToRollback = new HoodieInstant(false, actionType, commit);
+    Long startTime = System.currentTimeMillis();
 
     // delete all the data files for this commit
     logger.info("Clean out all parquet files generated for commit: " + commit);
+    List<RollbackRequest> rollbackRequests = generateRollbackRequests(instantToRollback);
+
+    //TODO: We need to persist this as rollback workload and use it in case of partial failures
     List<HoodieRollbackStat> stats =
-        jsc.parallelize(FSUtils.getAllPartitionPaths(metaClient.getFs(), getMetaClient().getBasePath(),
-            config.shouldAssumeDatePartitioning())).map((Function<String, HoodieRollbackStat>) partitionPath -> {
-              // Scan all partitions files with this commit time
-              final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
-              deleteCleanedFiles(filesToDeletedStatus, commit, partitionPath);
-              return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
-                  .withDeletedFileResults(filesToDeletedStatus).build();
-            }).collect();
+        new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests);
 
     // Delete Inflight instant if enabled
     deleteInflightInstant(deleteInstants, activeTimeline, new HoodieInstant(true, actionType, commit));
+    logger.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
     return stats;
   }
 
+  private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback)
+      throws IOException {
+    return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
+        config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> {
+          return RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback);
+        }).collect(Collectors.toList());
+  }
+
   /**
    * Delete Inflight instant if enabled
-   * 
+   *
    * @param deleteInstant Enable Deletion of Inflight instant
    * @param activeTimeline Hoodie active timeline
    * @param instantToBeDeleted Instant to be deleted
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index f9b4141..8dcb3bf 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -19,22 +19,17 @@
 package org.apache.hudi.table;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.stream.Collectors;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hudi.WriteStatus;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.HoodieRollbackStat;
@@ -47,11 +42,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTimeline;
 import org.apache.hudi.common.table.SyncableFileSystemView;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.FSUtils;
@@ -59,10 +49,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCompactionException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.func.MergeOnReadLazyInsertIterable;
-import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.io.HoodieAppendHandle;
 import org.apache.hudi.io.compact.HoodieRealtimeTableCompactor;
 import org.apache.log4j.LogManager;
@@ -70,7 +58,6 @@ import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
 
 /**
  * Implementation of a more real-time read-optimized Hoodie Table where
@@ -180,7 +167,6 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
   @Override
   public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, boolean deleteInstants)
       throws IOException {
-
     // At the moment, MOR table type does not support bulk nested rollbacks. Nested rollbacks is an experimental
     // feature that is expensive. To perform nested rollbacks, initiate multiple requests of client.rollback
     // (commitToRollback).
@@ -198,140 +184,121 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
     }
     logger.info("Unpublished " + commit);
     Long startTime = System.currentTimeMillis();
+    List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, instantToRollback);
+    //TODO: We need to persist this as rollback workload and use it in case of partial failures
     List<HoodieRollbackStat> allRollbackStats =
-        jsc.parallelize(FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
-            config.shouldAssumeDatePartitioning())).map((Function<String, HoodieRollbackStat>) partitionPath -> {
-              HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload();
-              HoodieRollbackStat hoodieRollbackStats = null;
-              // Need to put the path filter here since Filter is not serializable
-              // PathFilter to get all parquet files and log files that need to be deleted
-              PathFilter filter = (path) -> {
-                if (path.toString().contains(".parquet")) {
-                  String fileCommitTime = FSUtils.getCommitTime(path.getName());
-                  return commit.equals(fileCommitTime);
-                } else if (path.toString().contains(".log")) {
-                  // Since the baseCommitTime is the only commit for new log files, it's okay here
-                  String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
-                  return commit.equals(fileCommitTime);
-                }
-                return false;
-              };
-
-              final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
-
-              switch (instantToRollback.getAction()) {
-                case HoodieTimeline.COMMIT_ACTION:
-                  try {
-                    // Rollback of a commit should delete the newly created parquet files along with any log
-                    // files created with this as baseCommit. This is required to support multi-rollbacks in a MOR
-                    // table.
-                    super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
-                    hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
-                        .withDeletedFileResults(filesToDeletedStatus).build();
-                    break;
-                  } catch (IOException io) {
-                    throw new UncheckedIOException("Failed to rollback for commit " + commit, io);
-                  }
-                case HoodieTimeline.COMPACTION_ACTION:
-                  try {
-                    // If there is no delta commit present after the current commit (if compaction), no action, else we
-                    // need to make sure that a compaction commit rollback also deletes any log files written as part of
-                    // the
-                    // succeeding deltacommit.
-                    boolean higherDeltaCommits = !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants()
-                        .findInstantsAfter(commit, 1).empty();
-                    if (higherDeltaCommits) {
-                      // Rollback of a compaction action with no higher deltacommit means that the compaction is
-                      // scheduled
-                      // and has not yet finished. In this scenario we should delete only the newly created parquet
-                      // files
-                      // and not corresponding base commit log files created with this as baseCommit since updates would
-                      // have been written to the log files.
-                      super.deleteCleanedFiles(filesToDeletedStatus, commit, partitionPath);
-                      hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
-                          .withDeletedFileResults(filesToDeletedStatus).build();
-                    } else {
-                      // No deltacommits present after this compaction commit (inflight or requested). In this case, we
-                      // can also delete any log files that were created with this compaction commit as base
-                      // commit.
-                      super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
-                      hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
-                          .withDeletedFileResults(filesToDeletedStatus).build();
-                    }
-                    break;
-                  } catch (IOException io) {
-                    throw new UncheckedIOException("Failed to rollback for commit " + commit, io);
-                  }
-                case HoodieTimeline.DELTA_COMMIT_ACTION:
-                  // --------------------------------------------------------------------------------------------------
-                  // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
-                  // --------------------------------------------------------------------------------------------------
-                  // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries.
-                  // In
-                  // this scenario we would want to delete these log files.
-                  // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
-                  // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
-                  // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
-                  // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime
-                  // and
-                  // and hence will end up deleting these log files. This is done so there are no orphan log files
-                  // lying around.
-                  // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
-                  // taken in this scenario is a combination of (A.2) and (A.3)
-                  // ---------------------------------------------------------------------------------------------------
-                  // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
-                  // ---------------------------------------------------------------------------------------------------
-                  // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no
-                  // entries.
-                  // In this scenario, we delete all the parquet files written for the failed commit.
-                  // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In
-                  // this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
-                  // (B.3) Rollback triggered for first commit - Same as (B.1)
-                  // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
-                  // as well if the base parquet file gets deleted.
-                  try {
-                    HoodieCommitMetadata commitMetadata =
-                        HoodieCommitMetadata.fromBytes(
-                            metaClient.getCommitTimeline().getInstantDetails(new HoodieInstant(true,
-                                instantToRollback.getAction(), instantToRollback.getTimestamp())).get(),
-                            HoodieCommitMetadata.class);
-
-                    // read commit file and (either append delete blocks or delete file)
-                    Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
-
-                    // In case all data was inserts and the commit failed, delete the file belonging to that commit
-                    // We do not know fileIds for inserts (first inserts are either log files or parquet files),
-                    // delete all files for the corresponding failed commit, if present (same as COW)
-                    super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
-                    final Set<String> deletedFiles = filesToDeletedStatus.entrySet().stream().map(entry -> {
-                      Path filePath = entry.getKey().getPath();
-                      return FSUtils.getFileIdFromFilePath(filePath);
-                    }).collect(Collectors.toSet());
-
-                    // append rollback blocks for updates
-                    if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
-                      hoodieRollbackStats = rollback(index, partitionPath, commit, commitMetadata, filesToDeletedStatus,
-                          filesToNumBlocksRollback, deletedFiles);
-                    }
-                    break;
-                  } catch (IOException io) {
-                    throw new UncheckedIOException("Failed to rollback for commit " + commit, io);
-                  }
-                default:
-                  break;
-              }
-              return hoodieRollbackStats;
-            }).filter(Objects::nonNull).collect();
-
+        new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests);
     // Delete Inflight instants if enabled
-    deleteInflightInstant(deleteInstants, this.getActiveTimeline(),
-        new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()));
+    deleteInflightInstant(deleteInstants, this.getActiveTimeline(), new HoodieInstant(true, instantToRollback
+        .getAction(), instantToRollback.getTimestamp()));
 
-    logger.debug("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
+    logger.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
 
     return allRollbackStats;
   }
 
+  /**
+   * Generate all rollback requests that we need to perform for rolling back this action without actually performing
+   * rolling back
+   * @param jsc JavaSparkContext
+   * @param instantToRollback Instant to Rollback
+   * @return list of rollback requests
+   * @throws IOException
+   */
+  private List<RollbackRequest> generateRollbackRequests(JavaSparkContext jsc, HoodieInstant instantToRollback)
+      throws IOException {
+    String commit = instantToRollback.getTimestamp();
+    List<String> partitions = FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
+        config.shouldAssumeDatePartitioning());
+    int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
+    return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions))
+        .flatMap(partitionPath -> {
+          HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload();
+          List<RollbackRequest> partitionRollbackRequests = new ArrayList<>();
+          switch (instantToRollback.getAction()) {
+            case HoodieTimeline.COMMIT_ACTION:
+              logger.info("Rolling back commit action. There are higher delta commits. So only rolling back this "
+                  + "instant");
+              partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(
+                  partitionPath, instantToRollback));
+              break;
+            case HoodieTimeline.COMPACTION_ACTION:
+              // If there is no delta commit present after the current commit (if compaction), no action, else we
+              // need to make sure that a compaction commit rollback also deletes any log files written as part of the
+              // succeeding deltacommit.
+              boolean higherDeltaCommits = !activeTimeline.getDeltaCommitTimeline()
+                  .filterCompletedInstants().findInstantsAfter(commit, 1).empty();
+              if (higherDeltaCommits) {
+                // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled
+                // and has not yet finished. In this scenario we should delete only the newly created parquet files
+                // and not corresponding base commit log files created with this as baseCommit since updates would
+                // have been written to the log files.
+                logger.info("Rolling back compaction. There are higher delta commits. So only deleting data files");
+                partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(
+                    partitionPath, instantToRollback));
+              } else {
+                // No deltacommits present after this compaction commit (inflight or requested). In this case, we
+                // can also delete any log files that were created with this compaction commit as base
+                // commit.
+                logger.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and"
+                    + " log files");
+                partitionRollbackRequests.add(
+                    RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath,
+                        instantToRollback));
+              }
+              break;
+            case HoodieTimeline.DELTA_COMMIT_ACTION:
+              // --------------------------------------------------------------------------------------------------
+              // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
+              // --------------------------------------------------------------------------------------------------
+              // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In
+              // this scenario we would want to delete these log files.
+              // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
+              // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
+              // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
+              // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and
+              // and hence will end up deleting these log files. This is done so there are no orphan log files
+              // lying around.
+              // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
+              // taken in this scenario is a combination of (A.2) and (A.3)
+              // ---------------------------------------------------------------------------------------------------
+              // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
+              // ---------------------------------------------------------------------------------------------------
+              // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries.
+              // In this scenario, we delete all the parquet files written for the failed commit.
+              // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In
+              // this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
+              // (B.3) Rollback triggered for first commit - Same as (B.1)
+              // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
+              // as well if the base parquet file gets deleted.
+              try {
+                HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+                    metaClient.getCommitTimeline().getInstantDetails(
+                        new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()))
+                        .get(), HoodieCommitMetadata.class);
+
+                // In case all data was inserts and the commit failed, delete the file belonging to that commit
+                // We do not know fileIds for inserts (first inserts are either log files or parquet files),
+                // delete all files for the corresponding failed commit, if present (same as COW)
+                partitionRollbackRequests.add(RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(
+                    partitionPath, instantToRollback));
+
+                // append rollback blocks for updates
+                if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
+                  partitionRollbackRequests
+                      .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata));
+                }
+                break;
+              } catch (IOException io) {
+                throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io);
+              }
+            default:
+              break;
+          }
+          return partitionRollbackRequests.iterator();
+        }).filter(Objects::nonNull).collect();
+  }
+
   @Override
   public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats)
       throws HoodieIOException {
@@ -450,19 +417,10 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
     }
   }
 
-  private Map<HeaderMetadataType, String> generateHeader(String commit) {
-    // generate metadata
-    Map<HeaderMetadataType, String> header = Maps.newHashMap();
-    header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
-    header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
-    header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
-        String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
-    return header;
-  }
+  private List<RollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant,
+      HoodieCommitMetadata commitMetadata) {
+    Preconditions.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
 
-  private HoodieRollbackStat rollback(HoodieIndex hoodieIndex, String partitionPath, String commit,
-      HoodieCommitMetadata commitMetadata, final Map<FileStatus, Boolean> filesToDeletedStatus,
-      Map<FileStatus, Long> filesToNumBlocksRollback, Set<String> deletedFiles) {
     // wStat.getPrevCommit() might not give the right commit time in the following
     // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
     // used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
@@ -470,47 +428,27 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
     // baseCommit always by listing the file slice
     Map<String, String> fileIdToBaseCommitTimeForLogMap = this.getRTFileSystemView().getLatestFileSlices(partitionPath)
         .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime));
-    commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> {
-      // Filter out stats without prevCommit since they are all inserts
-      return wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT && wStat.getPrevCommit() != null
-          && !deletedFiles.contains(wStat.getFileId());
-    }).forEach(wStat -> {
-      Writer writer = null;
-      String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
-      if (null != baseCommitTime) {
-        boolean success = false;
-        try {
-          writer = HoodieLogFormat.newWriterBuilder()
-              .onParentPath(FSUtils.getPartitionPath(this.getMetaClient().getBasePath(), partitionPath))
-              .withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime).withFs(this.metaClient.getFs())
-              .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-          // generate metadata
-          Map<HeaderMetadataType, String> header = generateHeader(commit);
-          // if update belongs to an existing log file
-          writer = writer.appendBlock(new HoodieCommandBlock(header));
-          success = true;
-        } catch (IOException | InterruptedException io) {
-          throw new HoodieRollbackException("Failed to rollback for commit " + commit, io);
-        } finally {
-          try {
-            if (writer != null) {
-              writer.close();
-            }
-            if (success) {
-              // This step is intentionally done after writer is closed. Guarantees that
-              // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
-              // cloud-storage : HUDI-168
-              filesToNumBlocksRollback.put(this.getMetaClient().getFs().getFileStatus(writer.getLogFile().getPath()),
-                  1L);
-            }
-          } catch (IOException io) {
-            throw new UncheckedIOException(io);
+    return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream()
+        .filter(wStat -> {
+
+          // Filter out stats without prevCommit since they are all inserts
+          boolean validForRollback = (wStat != null) && (wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT)
+              && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId());
+
+          if (validForRollback) {
+            // For sanity, log instant time can never be less than base-commit on which we are rolling back
+            Preconditions.checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
+                wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL));
           }
-        }
-      }
-    });
-    return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath).withDeletedFileResults(filesToDeletedStatus)
-        .withRollbackBlockAppendResults(filesToNumBlocksRollback).build();
-  }
 
+          return validForRollback  && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
+              // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option
+              // to delete and we should not step on it
+              wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER);
+        }).map(wStat -> {
+          String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
+          return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(),
+              baseCommitTime, rollbackInstant);
+        }).collect(Collectors.toList());
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java
new file mode 100644
index 0000000..8bdf371
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+/**
+ * Performs Rollback of Hoodie Tables
+ */
+public class RollbackExecutor implements Serializable {
+
+  private static Logger logger = LogManager.getLogger(RollbackExecutor.class);
+
+  private final HoodieTableMetaClient metaClient;
+  private final HoodieWriteConfig config;
+
+  public RollbackExecutor(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+    this.metaClient = metaClient;
+    this.config = config;
+  }
+
+  /**
+   * Performs all rollback actions that we have collected in parallel.
+   */
+  public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc,
+      HoodieInstant instantToRollback, List<RollbackRequest> rollbackRequests) {
+
+    SerializablePathFilter filter = (path) -> {
+      if (path.toString().contains(".parquet")) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return instantToRollback.getTimestamp().equals(fileCommitTime);
+      } else if (path.toString().contains(".log")) {
+        // Since the baseCommitTime is the only commit for new log files, it's okay here
+        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+        return instantToRollback.getTimestamp().equals(fileCommitTime);
+      }
+      return false;
+    };
+
+    int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
+    return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> {
+      final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
+      switch (rollbackRequest.getRollbackAction()) {
+        case DELETE_DATA_FILES_ONLY: {
+          deleteCleanedFiles(metaClient, config, filesToDeletedStatus, instantToRollback.getTimestamp(),
+              rollbackRequest.getPartitionPath());
+          return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(),
+              HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                  .withDeletedFileResults(filesToDeletedStatus).build());
+        }
+        case DELETE_DATA_AND_LOG_FILES: {
+          deleteCleanedFiles(metaClient, config, filesToDeletedStatus, rollbackRequest.getPartitionPath(), filter);
+          return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(),
+              HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                  .withDeletedFileResults(filesToDeletedStatus).build());
+        }
+        case APPEND_ROLLBACK_BLOCK: {
+          Writer writer = null;
+          boolean success = false;
+          try {
+            writer = HoodieLogFormat.newWriterBuilder().onParentPath(
+                FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
+                .withFileId(rollbackRequest.getFileId().get())
+                .overBaseCommit(rollbackRequest.getLatestBaseInstant().get())
+                .withFs(metaClient.getFs())
+                .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+
+            // generate metadata
+            Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
+            // if update belongs to an existing log file
+            writer = writer.appendBlock(new HoodieCommandBlock(header));
+            success = true;
+          } catch (IOException | InterruptedException io) {
+            throw new HoodieRollbackException(
+                "Failed to rollback for instant " + instantToRollback, io);
+          } finally {
+            try {
+              if (writer != null) {
+                writer.close();
+              }
+            } catch (IOException io) {
+              throw new UncheckedIOException(io);
+            }
+          }
+
+          // This step is intentionally done after writer is closed. Guarantees that
+          // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
+          // cloud-storage : HUDI-168
+          Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
+          filesToNumBlocksRollback.put(metaClient.getFs()
+              .getFileStatus(writer.getLogFile().getPath()), 1L);
+          return new Tuple2<String, HoodieRollbackStat>(rollbackRequest.getPartitionPath(),
+              HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                  .withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
+        }
+        default:
+          throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
+      }
+    }).reduceByKey(this::mergeRollbackStat).map(Tuple2::_2).collect();
+  }
+
+  /**
+   * Helper to merge 2 rollback-stats for a given partition
+   *
+   * @param stat1 HoodieRollbackStat
+   * @param stat2 HoodieRollbackStat
+   * @return Merged HoodieRollbackStat
+   */
+  private HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRollbackStat stat2) {
+    Preconditions.checkArgument(stat1.getPartitionPath().equals(stat2.getPartitionPath()));
+    final List<String> successDeleteFiles = new ArrayList<>();
+    final List<String> failedDeleteFiles = new ArrayList<>();
+    final Map<FileStatus, Long> commandBlocksCount = new HashMap<>();
+
+    if (stat1.getSuccessDeleteFiles() != null) {
+      successDeleteFiles.addAll(stat1.getSuccessDeleteFiles());
+    }
+    if (stat2.getSuccessDeleteFiles() != null) {
+      successDeleteFiles.addAll(stat2.getSuccessDeleteFiles());
+    }
+    if (stat1.getFailedDeleteFiles() != null) {
+      failedDeleteFiles.addAll(stat1.getFailedDeleteFiles());
+    }
+    if (stat2.getFailedDeleteFiles() != null) {
+      failedDeleteFiles.addAll(stat2.getFailedDeleteFiles());
+    }
+    if (stat1.getCommandBlocksCount() != null) {
+      commandBlocksCount.putAll(stat1.getCommandBlocksCount());
+    }
+    if (stat2.getCommandBlocksCount() != null) {
+      commandBlocksCount.putAll(stat2.getCommandBlocksCount());
+    }
+    return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount);
+  }
+
+  /**
+   * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits
+   */
+  private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
+      Map<FileStatus, Boolean> results, String partitionPath,
+      PathFilter filter) throws IOException {
+    logger.info("Cleaning path " + partitionPath);
+    FileSystem fs = metaClient.getFs();
+    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
+    for (FileStatus file : toBeDeleted) {
+      boolean success = fs.delete(file.getPath(), false);
+      results.put(file, success);
+      logger.info("Delete file " + file.getPath() + "\t" + success);
+    }
+    return results;
+  }
+
+  /**
+   * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits
+   */
+  private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
+      Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException {
+    logger.info("Cleaning path " + partitionPath);
+    FileSystem fs = metaClient.getFs();
+    PathFilter filter = (path) -> {
+      if (path.toString().contains(".parquet")) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
+    for (FileStatus file : toBeDeleted) {
+      boolean success = fs.delete(file.getPath(), false);
+      results.put(file, success);
+      logger.info("Delete file " + file.getPath() + "\t" + success);
+    }
+    return results;
+  }
+
+
+  private Map<HeaderMetadataType, String> generateHeader(String commit) {
+    // generate metadata
+    Map<HeaderMetadataType, String> header = Maps.newHashMap();
+    header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
+    header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
+    header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
+        String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
+    return header;
+  }
+
+  public interface SerializablePathFilter extends PathFilter, Serializable {
+
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java b/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java
new file mode 100644
index 0000000..0e61988
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/RollbackRequest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+
+/**
+ * Request for performing one rollback action
+ */
+public class RollbackRequest {
+
+  /**
+   * Rollback Action Types
+   */
+  public enum RollbackAction {
+    DELETE_DATA_FILES_ONLY,
+    DELETE_DATA_AND_LOG_FILES,
+    APPEND_ROLLBACK_BLOCK
+  }
+
+  /**
+   * Partition path that needs to be rolled-back
+   */
+  private final String partitionPath;
+
+  /**
+   * Rollback Instant
+   */
+  private final HoodieInstant rollbackInstant;
+
+  /**
+   * FileId in case of appending rollback block
+   */
+  private final Option<String> fileId;
+
+  /**
+   * Latest base instant needed for appending rollback block instant
+   */
+  private final Option<String> latestBaseInstant;
+
+  /**
+   * Rollback Action
+   */
+  private final RollbackAction rollbackAction;
+
+  public RollbackRequest(String partitionPath, HoodieInstant rollbackInstant,
+      Option<String> fileId, Option<String> latestBaseInstant, RollbackAction rollbackAction) {
+    this.partitionPath = partitionPath;
+    this.rollbackInstant = rollbackInstant;
+    this.fileId = fileId;
+    this.latestBaseInstant = latestBaseInstant;
+    this.rollbackAction = rollbackAction;
+  }
+
+  public static RollbackRequest createRollbackRequestWithDeleteDataFilesOnlyAction(String partitionPath,
+      HoodieInstant rollbackInstant) {
+    return new RollbackRequest(partitionPath, rollbackInstant, Option.empty(), Option.empty(),
+        RollbackAction.DELETE_DATA_FILES_ONLY);
+  }
+
+  public static RollbackRequest createRollbackRequestWithDeleteDataAndLogFilesAction(String partitionPath,
+      HoodieInstant rollbackInstant) {
+    return new RollbackRequest(partitionPath, rollbackInstant, Option.empty(), Option.empty(),
+        RollbackAction.DELETE_DATA_AND_LOG_FILES);
+  }
+
+  public static RollbackRequest createRollbackRequestWithAppendRollbackBlockAction(String partitionPath, String fileId,
+      String baseInstant, HoodieInstant rollbackInstant) {
+    return new RollbackRequest(partitionPath, rollbackInstant, Option.of(fileId), Option.of(baseInstant),
+        RollbackAction.APPEND_ROLLBACK_BLOCK);
+  }
+
+  public String getPartitionPath() {
+    return partitionPath;
+  }
+
+  public HoodieInstant getRollbackInstant() {
+    return rollbackInstant;
+  }
+
+  public Option<String> getFileId() {
+    return fileId;
+  }
+
+  public Option<String> getLatestBaseInstant() {
+    return latestBaseInstant;
+  }
+
+  public RollbackAction getRollbackAction() {
+    return rollbackAction;
+  }
+}