You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/07/21 05:41:51 UTC

[hudi] branch master updated: [HUDI-839] Introducing support for rollbacks using marker files (#1756)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ec89e9  [HUDI-839] Introducing support for rollbacks using marker files (#1756)
1ec89e9 is described below

commit 1ec89e9a94161aa5d48b8f170bf67633b389d997
Author: lw0090 <lw...@gmail.com>
AuthorDate: Tue Jul 21 13:41:42 2020 +0800

    [HUDI-839] Introducing support for rollbacks using marker files (#1756)
    
    * [HUDI-839] Introducing rollback strategy using marker files
    
     - Adds a new mechanism for rollbacks where it's based on the marker files generated during the write
     - Consequently, marker file/dir deletion now happens post commit, instead of during finalize
     - Marker files are also generated for AppendHandle, making it consistent throughout the write path
     - Until upgrade-downgrade mechanism can upgrade non-marker based inflight writes to marker based, this should only be turned on for new datasets.
     - Added marker dir deletion after successful commit/rollback, individual files are not deleted during finalize
     - Fail safe for deleting marker directories, now during timeline archival process
     - Added check to ensure completed instants are not rolled back using marker based strategy. This will be incorrect
     - Reworked tests to rollback inflight instants, instead of completed instants whenever necessary
     - Added an unit test for MarkerBasedRollbackStrategy
    
    
    Co-authored-by: Vinoth Chandar <vi...@apache.org>
---
 .../cli/commands/TestArchivedCommitsCommand.java   |   4 +-
 .../hudi/cli/commands/TestCommitsCommand.java      |   4 +-
 .../hudi/client/AbstractHoodieWriteClient.java     |   6 +-
 .../org/apache/hudi/client/HoodieWriteClient.java  |  14 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  13 ++
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  18 +-
 .../org/apache/hudi/io/HoodieCreateHandle.java     |  12 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  15 +-
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |  30 +--
 .../src/main/java/org/apache/hudi/io/IOType.java   |  28 +++
 .../java/org/apache/hudi/table/HoodieTable.java    | 104 +++++------
 .../hudi/table/HoodieTimelineArchiveLog.java       |  91 +++++----
 .../java/org/apache/hudi/table/MarkerFiles.java    | 153 +++++++++++++++
 .../restore/CopyOnWriteRestoreActionExecutor.java  |   3 +-
 .../restore/MergeOnReadRestoreActionExecutor.java  |   3 +-
 .../rollback/BaseRollbackActionExecutor.java       | 103 ++++++----
 .../CopyOnWriteRollbackActionExecutor.java         |  43 +++--
 ...Helper.java => ListingBasedRollbackHelper.java} |  69 +++----
 ...quest.java => ListingBasedRollbackRequest.java} |  56 +++---
 .../rollback/MarkerBasedRollbackStrategy.java      | 161 ++++++++++++++++
 .../MergeOnReadRollbackActionExecutor.java         |  68 ++++---
 .../hudi/table/action/rollback/RollbackUtils.java  |  66 +++++++
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  27 ++-
 ...eLog.java => TestHoodieTimelineArchiveLog.java} |  45 +++--
 .../java/org/apache/hudi/table/TestCleaner.java    |  14 +-
 .../hudi/table/TestHoodieMergeOnReadTable.java     | 207 ++++++++++++++-------
 .../org/apache/hudi/table/TestMarkerFiles.java     | 148 +++++++++++++++
 .../table/action/commit/TestUpsertPartitioner.java |   2 +-
 .../rollback/HoodieClientRollbackTestBase.java     |  98 ++++++++++
 .../TestCopyOnWriteRollbackActionExecutor.java     | 198 ++++++++++++++++++++
 .../rollback/TestMarkerBasedRollbackStrategy.java  | 135 ++++++++++++++
 .../TestMergeOnReadRollbackActionExecutor.java     | 157 ++++++++++++++++
 .../table/action/rollback/TestRollbackUtils.java   | 120 ++++++++++++
 .../hudi/testutils/HoodieClientTestHarness.java    |   2 -
 .../hudi/testutils/HoodieClientTestUtils.java      |  49 ++++-
 .../hudi/testutils/HoodieMergeOnReadTestUtils.java |   2 +-
 .../hudi/testutils/HoodieTestDataGenerator.java    |  22 ++-
 .../org/apache/hudi/common/HoodieRollbackStat.java |  19 ++
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  65 ++-----
 .../hudi/common/testutils/FileSystemTestUtils.java |  15 ++
 .../common/testutils/HoodieCommonTestHarness.java  |  12 ++
 .../hudi/common/testutils/HoodieTestUtils.java     |  41 ++--
 .../functional/TestHoodieSnapshotExporter.java     |   5 +-
 43 files changed, 1941 insertions(+), 506 deletions(-)

diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
index ef303ad..313c1bc 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
@@ -91,8 +91,8 @@ public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest {
     metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
 
     // archive
-    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
-    archiveLog.archiveIfRequired(hadoopConf);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
+    archiveLog.archiveIfRequired();
   }
 
   @AfterEach
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
index 8bacf4f..4761252 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
@@ -175,8 +175,8 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest {
 
     // archive
     metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
-    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
-    archiveLog.archiveIfRequired(jsc.hadoopConfiguration());
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, jsc.hadoopConfiguration());
+    archiveLog.archiveIfRequired();
 
     CommandResult cr = getShell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104"));
     assertTrue(cr.isSuccess());
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 096bc2e..b922caa 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -117,7 +117,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
     try {
       activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime),
           Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
-      postCommit(metadata, instantTime, extraMetadata);
+      postCommit(table, metadata, instantTime, extraMetadata);
       emitCommitMetrics(instantTime, metadata, actionType);
       LOG.info("Committed " + instantTime);
     } catch (IOException e) {
@@ -144,11 +144,13 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
 
   /**
    * Post Commit Hook. Derived classes use this method to perform post-commit processing
+   *
+   * @param table         table to commit on
    * @param metadata      Commit Metadata corresponding to committed instant
    * @param instantTime   Instant Time
    * @param extraMetadata Additional Metadata passed by user
    */
-  protected abstract void postCommit(HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata);
+  protected abstract void postCommit(HoodieTable<?> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata);
 
   /**
    * Finalize Write operation.
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index a6d9d0d..30dfecb 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -47,6 +47,7 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.HoodieTimelineArchiveLog;
+import org.apache.hudi.table.MarkerFiles;
 import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.compact.CompactHelpers;
@@ -323,7 +324,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
             result.getWriteStats().get().size());
       }
 
-      postCommit(result.getCommitMetadata().get(), instantTime, Option.empty());
+      postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());
 
       emitCommitMetrics(instantTime, result.getCommitMetadata().get(),
           hoodieTable.getMetaClient().getCommitActionType());
@@ -332,9 +333,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   }
 
   @Override
-  protected void postCommit(HoodieCommitMetadata metadata, String instantTime,
-      Option<Map<String, String>> extraMetadata) {
+  protected void postCommit(HoodieTable<?> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
     try {
+
+      // Delete the marker directory for the instant.
+      new MarkerFiles(table, instantTime).quietDeleteMarkerDir();
+
       // Do an inline compaction if enabled
       if (config.isInlineCompaction()) {
         metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
@@ -343,8 +347,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
         metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false");
       }
       // We cannot have unbounded commit files. Archive commits if we have to archive
-      HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, createMetaClient(true));
-      archiveLog.archiveIfRequired(hadoopConf);
+      HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, hadoopConf);
+      archiveLog.archiveIfRequired();
       autoCleanOnCommit(instantTime);
     } catch (IOException ioe) {
       throw new HoodieIOException(ioe.getMessage(), ioe);
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 3b822f0..fb679f2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -54,6 +54,8 @@ import java.util.stream.Collectors;
 public class HoodieWriteConfig extends DefaultHoodieConfig {
 
   public static final String TABLE_NAME = "hoodie.table.name";
+  public static final String DEFAULT_ROLLBACK_USING_MARKERS = "false";
+  public static final String ROLLBACK_USING_MARKERS = "hoodie.rollback.using.markers";
   public static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version";
   public static final String BASE_PATH_PROP = "hoodie.base.path";
   public static final String AVRO_SCHEMA = "hoodie.avro.schema";
@@ -197,6 +199,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
   }
 
+  public boolean shouldRollbackUsingMarkers() {
+    return Boolean.parseBoolean(props.getProperty(ROLLBACK_USING_MARKERS));
+  }
+
   public int getWriteBufferLimitBytes() {
     return Integer.parseInt(props.getProperty(WRITE_BUFFER_LIMIT_BYTES, DEFAULT_WRITE_BUFFER_LIMIT_BYTES));
   }
@@ -710,6 +716,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withRollbackUsingMarkers(boolean rollbackUsingMarkers) {
+      props.setProperty(ROLLBACK_USING_MARKERS, String.valueOf(rollbackUsingMarkers));
+      return this;
+    }
+
     public Builder withWriteBufferLimitBytes(int writeBufferLimit) {
       props.setProperty(WRITE_BUFFER_LIMIT_BYTES, String.valueOf(writeBufferLimit));
       return this;
@@ -807,6 +818,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM,
           DEFAULT_ROLLBACK_PARALLELISM);
+      setDefaultOnCondition(props, !props.containsKey(ROLLBACK_USING_MARKERS), ROLLBACK_USING_MARKERS,
+          DEFAULT_ROLLBACK_USING_MARKERS);
       setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP), COMBINE_BEFORE_INSERT_PROP,
           DEFAULT_COMBINE_BEFORE_INSERT);
       setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_UPSERT_PROP), COMBINE_BEFORE_UPSERT_PROP,
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 12e3a07..e81c4ad 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -119,10 +119,11 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
       SliceView rtView = hoodieTable.getSliceView();
       Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
       // Set the base commit time as the current instantTime for new inserts into log files
-      String baseInstantTime = instantTime;
+      String baseInstantTime;
       if (fileSlice.isPresent()) {
         baseInstantTime = fileSlice.get().getBaseInstantTime();
       } else {
+        baseInstantTime = instantTime;
         // This means there is no base data file, start appending to a new log file
         fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime, this.fileId));
         LOG.info("New InsertHandle for partition :" + partitionPath);
@@ -138,6 +139,12 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
         HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, baseInstantTime,
             new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
         partitionMetadata.trySave(getPartitionId());
+
+        // Since the actual log file written to can be different based on when rollover happens, we use the
+        // base file to denote some log appends happened on a slice. writeToken will still fence concurrent
+        // writers.
+        createMarkerFile(partitionPath, FSUtils.makeDataFileName(baseInstantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()));
+
         this.writer = createLogWriter(fileSlice, baseInstantTime);
         this.currentLogFile = writer.getLogFile();
         ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion());
@@ -278,6 +285,11 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
     return writeStatus;
   }
 
+  @Override
+  public IOType getIOType() {
+    return IOType.APPEND;
+  }
+
   private Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTime)
       throws IOException, InterruptedException {
     Option<HoodieLogFile> latestLogFile = fileSlice.get().getLatestLogFile();
@@ -288,7 +300,8 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
         .withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
         .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs)
         .withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
-        .withRolloverLogWriteToken(writeToken).withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+        .withRolloverLogWriteToken(writeToken)
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
   }
 
   private void writeToBuffer(HoodieRecord<T> record) {
@@ -327,5 +340,4 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
       numberOfRecords = 0;
     }
   }
-
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index c437f54..dfa63b0 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.generic.GenericRecord;
@@ -66,9 +67,9 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
       HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
           new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
       partitionMetadata.trySave(getPartitionId());
-      createMarkerFile(partitionPath);
-      this.fileWriter = createNewFileWriter(instantTime, path, hoodieTable, config, writerSchema,
-          this.sparkTaskContextSupplier);
+      createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
+      this.fileWriter =
+          HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, writerSchema, this.sparkTaskContextSupplier);
     } catch (IOException e) {
       throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
     }
@@ -146,6 +147,11 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
     return writeStatus;
   }
 
+  @Override
+  public IOType getIOType() {
+    return IOType.CREATE;
+  }
+
   /**
    * Performs actions to durably, persist the current changes and returns a WriteStatus object.
    */
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 0eaf3f2..e87cf3c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.io;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.SparkConfigUtils;
@@ -88,10 +87,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
     init(fileId, this.partitionPath, dataFileToBeMerged);
   }
 
-  public static Schema createHoodieWriteSchema(Schema originalSchema) {
-    return HoodieAvroUtils.addMetadataFields(originalSchema);
-  }
-
   @Override
   public Schema getWriterSchema() {
     return writerSchema;
@@ -113,8 +108,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
       partitionMetadata.trySave(getPartitionId());
 
       oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath);
+      String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension());
       String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
-          + FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension())).toString();
+          + newFileName).toString();
       newFilePath = new Path(config.getBasePath(), relativePath);
 
       LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(),
@@ -127,7 +123,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
       writeStatus.getStat().setPath(new Path(config.getBasePath()), newFilePath);
 
       // Create Marker file
-      createMarkerFile(partitionPath);
+      createMarkerFile(partitionPath, newFileName);
 
       // Create the writer for writing the new version file
       fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
@@ -311,4 +307,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
   public WriteStatus getWriteStatus() {
     return writeStatus;
   }
+
+  @Override
+  public IOType getIOType() {
+    return IOType.MERGE;
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 4470b76..7fd3b42 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
@@ -39,6 +38,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.table.MarkerFiles;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -50,6 +50,7 @@ import java.io.IOException;
 public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends HoodieIOHandle {
 
   private static final Logger LOG = LogManager.getLogger(HoodieWriteHandle.class);
+
   protected final Schema originalSchema;
   protected final Schema writerSchema;
   protected HoodieTimer timer;
@@ -97,28 +98,9 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath) {
-    Path markerPath = makeNewMarkerPath(partitionPath);
-    try {
-      LOG.info("Creating Marker Path=" + markerPath);
-      fs.create(markerPath, false).close();
-    } catch (IOException e) {
-      throw new HoodieException("Failed to create marker file " + markerPath, e);
-    }
-  }
-
-  /**
-   * THe marker path will be <base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.
-   */
-  private Path makeNewMarkerPath(String partitionPath) {
-    Path markerRootPath = new Path(hoodieTable.getMetaClient().getMarkerFolderPath(instantTime));
-    Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath);
-    try {
-      fs.mkdirs(path); // create a new partition as needed.
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to make dir " + path, e);
-    }
-    return new Path(path.toString(), FSUtils.makeMarkerFile(instantTime, writeToken, fileId));
+  protected void createMarkerFile(String partitionPath, String dataFileName) {
+    MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
+    markerFiles.create(partitionPath, dataFileName, getIOType());
   }
 
   public Schema getWriterSchema() {
@@ -167,6 +149,8 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
 
   public abstract WriteStatus getWriteStatus();
 
+  public abstract IOType getIOType();
+
   @Override
   protected FileSystem getFileSystem() {
     return hoodieTable.getMetaClient().getFs();
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/IOType.java b/hudi-client/src/main/java/org/apache/hudi/io/IOType.java
new file mode 100644
index 0000000..aa6660e
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/io/IOType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+/**
+ * Types of lower level I/O operations done on each file slice.
+ */
+public enum IOType {
+  MERGE,
+  CREATE,
+  APPEND
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index b545eae..14dd168 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -32,7 +32,6 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.ConsistencyGuard;
 import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
@@ -378,26 +377,29 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
    * @throws HoodieIOException if some paths can't be finalized on storage
    */
   public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats) throws HoodieIOException {
-    cleanFailedWrites(jsc, instantTs, stats, config.getConsistencyGuardConfig().isConsistencyCheckEnabled());
-  }
-
-  /**
-   * Delete Marker directory corresponding to an instant.
-   *
-   * @param instantTs Instant Time
-   */
-  public void deleteMarkerDir(String instantTs) {
-    try {
-      FileSystem fs = getMetaClient().getFs();
-      Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs));
-      if (fs.exists(markerDir)) {
-        // For append only case, we do not write to marker dir. Hence, the above check
-        LOG.info("Removing marker directory=" + markerDir);
-        fs.delete(markerDir, true);
-      }
-    } catch (IOException ioe) {
-      throw new HoodieIOException(ioe.getMessage(), ioe);
-    }
+    reconcileAgainstMarkers(jsc, instantTs, stats, config.getConsistencyGuardConfig().isConsistencyCheckEnabled());
+  }
+
+  private void deleteInvalidFilesByPartitions(JavaSparkContext jsc, Map<String, List<Pair<String, String>>> invalidFilesByPartition) {
+    // Now delete partially written files
+    jsc.parallelize(new ArrayList<>(invalidFilesByPartition.values()), config.getFinalizeWriteParallelism())
+        .map(partitionWithFileList -> {
+          final FileSystem fileSystem = metaClient.getFs();
+          LOG.info("Deleting invalid data files=" + partitionWithFileList);
+          if (partitionWithFileList.isEmpty()) {
+            return true;
+          }
+          // Delete
+          partitionWithFileList.stream().map(Pair::getValue).forEach(file -> {
+            try {
+              fileSystem.delete(new Path(file), false);
+            } catch (IOException e) {
+              throw new HoodieIOException(e.getMessage(), e);
+            }
+          });
+
+          return true;
+        }).collect();
   }
 
   /**
@@ -410,72 +412,54 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
    * @param consistencyCheckEnabled Consistency Check Enabled
    * @throws HoodieIOException
    */
-  protected void cleanFailedWrites(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats,
-      boolean consistencyCheckEnabled) throws HoodieIOException {
+  protected void reconcileAgainstMarkers(JavaSparkContext jsc,
+                                         String instantTs,
+                                         List<HoodieWriteStat> stats,
+                                         boolean consistencyCheckEnabled) throws HoodieIOException {
     try {
       // Reconcile marker and data files with WriteStats so that partially written data-files due to failed
       // (but succeeded on retry) tasks are removed.
       String basePath = getMetaClient().getBasePath();
-      FileSystem fs = getMetaClient().getFs();
-      Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs));
+      MarkerFiles markers = new MarkerFiles(this, instantTs);
 
-      if (!fs.exists(markerDir)) {
-        // Happens when all writes are appends
+      if (!markers.doesMarkerDirExist()) {
+        // can happen if it was an empty write say.
         return;
       }
 
-      final String baseFileExtension = getBaseFileFormat().getFileExtension();
-      List<String> invalidDataPaths = FSUtils.getAllDataFilesForMarkers(fs, basePath, instantTs, markerDir.toString(),
-          baseFileExtension);
-      List<String> validDataPaths = stats.stream().map(w -> String.format("%s/%s", basePath, w.getPath()))
-          .filter(p -> p.endsWith(baseFileExtension)).collect(Collectors.toList());
+      // we are not including log appends here, since they are already fail-safe.
+      List<String> invalidDataPaths = markers.createdAndMergedDataPaths();
+      List<String> validDataPaths = stats.stream()
+          .map(HoodieWriteStat::getPath)
+          .filter(p -> p.endsWith(this.getBaseFileExtension()))
+          .collect(Collectors.toList());
       // Contains list of partially created files. These needs to be cleaned up.
       invalidDataPaths.removeAll(validDataPaths);
       if (!invalidDataPaths.isEmpty()) {
-        LOG.info(
-            "Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths);
+        LOG.info("Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths);
       }
+      Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream()
+          .map(dp -> Pair.of(new Path(dp).getParent().toString(), new Path(basePath, dp).toString()))
+          .collect(Collectors.groupingBy(Pair::getKey));
 
-      Map<String, List<Pair<String, String>>> groupByPartition = invalidDataPaths.stream()
-          .map(dp -> Pair.of(new Path(dp).getParent().toString(), dp)).collect(Collectors.groupingBy(Pair::getKey));
-
-      if (!groupByPartition.isEmpty()) {
+      if (!invalidPathsByPartition.isEmpty()) {
         // Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS.
         // Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit
         if (consistencyCheckEnabled) {
           // This will either ensure all files to be deleted are present.
-          waitForAllFiles(jsc, groupByPartition, FileVisibility.APPEAR);
+          waitForAllFiles(jsc, invalidPathsByPartition, FileVisibility.APPEAR);
         }
 
         // Now delete partially written files
         jsc.setJobGroup(this.getClass().getSimpleName(), "Delete all partially written files");
-        jsc.parallelize(new ArrayList<>(groupByPartition.values()), config.getFinalizeWriteParallelism())
-            .map(partitionWithFileList -> {
-              final FileSystem fileSystem = metaClient.getFs();
-              LOG.info("Deleting invalid data files=" + partitionWithFileList);
-              if (partitionWithFileList.isEmpty()) {
-                return true;
-              }
-              // Delete
-              partitionWithFileList.stream().map(Pair::getValue).forEach(file -> {
-                try {
-                  fileSystem.delete(new Path(file), false);
-                } catch (IOException e) {
-                  throw new HoodieIOException(e.getMessage(), e);
-                }
-              });
-
-              return true;
-            }).collect();
+        deleteInvalidFilesByPartitions(jsc, invalidPathsByPartition);
 
         // Now ensure the deleted files disappear
         if (consistencyCheckEnabled) {
           // This will either ensure all files to be deleted are absent.
-          waitForAllFiles(jsc, groupByPartition, FileVisibility.DISAPPEAR);
+          waitForAllFiles(jsc, invalidPathsByPartition, FileVisibility.DISAPPEAR);
         }
       }
-      // Now delete the marker directory
-      deleteMarkerDir(instantTs);
     } catch (IOException ioe) {
       throw new HoodieIOException(ioe.getMessage(), ioe);
     }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index c94f30b..98d3e05 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -19,6 +19,12 @@
 package org.apache.hudi.table;
 
 import org.apache.hadoop.conf.Configuration;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+
 import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -46,12 +52,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -67,6 +67,9 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
+
 /**
  * Archiver to bound the growth of files under .hoodie meta path.
  */
@@ -75,14 +78,20 @@ public class HoodieTimelineArchiveLog {
   private static final Logger LOG = LogManager.getLogger(HoodieTimelineArchiveLog.class);
 
   private final Path archiveFilePath;
-  private final HoodieTableMetaClient metaClient;
   private final HoodieWriteConfig config;
   private Writer writer;
+  private final int maxInstantsToKeep;
+  private final int minInstantsToKeep;
+  private final HoodieTable<?> table;
+  private final HoodieTableMetaClient metaClient;
 
-  public HoodieTimelineArchiveLog(HoodieWriteConfig config, HoodieTableMetaClient metaClient) {
+  public HoodieTimelineArchiveLog(HoodieWriteConfig config, Configuration configuration) {
     this.config = config;
-    this.metaClient = metaClient;
+    this.table = HoodieTable.create(config, configuration);
+    this.metaClient = table.getMetaClient();
     this.archiveFilePath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
+    this.maxInstantsToKeep = config.getMaxCommitsToKeep();
+    this.minInstantsToKeep = config.getMinCommitsToKeep();
   }
 
   private Writer openWriter() {
@@ -112,9 +121,9 @@ public class HoodieTimelineArchiveLog {
   /**
    * Check if commits need to be archived. If yes, archive commits.
    */
-  public boolean archiveIfRequired(final Configuration hadoopConf) throws IOException {
+  public boolean archiveIfRequired() throws IOException {
     try {
-      List<HoodieInstant> instantsToArchive = getInstantsToArchive(hadoopConf).collect(Collectors.toList());
+      List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
 
       boolean success = true;
       if (!instantsToArchive.isEmpty()) {
@@ -133,28 +142,21 @@ public class HoodieTimelineArchiveLog {
     }
   }
 
-  private Stream<HoodieInstant> getInstantsToArchive(Configuration hadoopConf) {
-
-    // TODO : rename to max/minInstantsToKeep
-    int maxCommitsToKeep = config.getMaxCommitsToKeep();
-    int minCommitsToKeep = config.getMinCommitsToKeep();
-
-    HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
-
-    // GroupBy each action and limit each action timeline to maxCommitsToKeep
-    // TODO: Handle ROLLBACK_ACTION in future
-    // ROLLBACK_ACTION is currently not defined in HoodieActiveTimeline
+  private Stream<HoodieInstant> getCleanInstantsToArchive() {
     HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
         .getTimelineOfActions(Collections.singleton(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants();
-    Stream<HoodieInstant> instants = cleanAndRollbackTimeline.getInstants()
-        .collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream().map(hoodieInstants -> {
-          if (hoodieInstants.size() > maxCommitsToKeep) {
-            return hoodieInstants.subList(0, hoodieInstants.size() - minCommitsToKeep);
+    return cleanAndRollbackTimeline.getInstants()
+        .collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream()
+        .map(hoodieInstants -> {
+          if (hoodieInstants.size() > this.maxInstantsToKeep) {
+            return hoodieInstants.subList(0, hoodieInstants.size() - this.minInstantsToKeep);
           } else {
             return new ArrayList<HoodieInstant>();
           }
         }).flatMap(Collection::stream);
+  }
 
+  private Stream<HoodieInstant> getCommitInstantsToArchive() {
     // TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
     // with logic above to avoid Stream.concats
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
@@ -164,17 +166,26 @@ public class HoodieTimelineArchiveLog {
     // We cannot have any holes in the commit timeline. We cannot archive any commits which are
     // made after the first savepoint present.
     Option<HoodieInstant> firstSavepoint = table.getCompletedSavepointTimeline().firstInstant();
-    if (!commitTimeline.empty() && commitTimeline.countInstants() > maxCommitsToKeep) {
+    if (!commitTimeline.empty() && commitTimeline.countInstants() > maxInstantsToKeep) {
       // Actually do the commits
-      instants = Stream.concat(instants, commitTimeline.getInstants().filter(s -> {
-        // if no savepoint present, then dont filter
-        return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(),
-            HoodieTimeline.LESSER_THAN_OR_EQUALS, s.getTimestamp()));
-      }).filter(s -> {
-        // Ensure commits >= oldest pending compaction commit is retained
-        return oldestPendingCompactionInstant.map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN, s.getTimestamp())).orElse(true);
-      }).limit(commitTimeline.countInstants() - minCommitsToKeep));
+      return commitTimeline.getInstants()
+          .filter(s -> {
+            // if no savepoint present, then dont filter
+            return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
+          }).filter(s -> {
+            // Ensure commits >= oldest pending compaction commit is retained
+            return oldestPendingCompactionInstant
+                .map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
+                .orElse(true);
+          }).limit(commitTimeline.countInstants() - minInstantsToKeep);
+    } else {
+      return Stream.empty();
     }
+  }
+
+  private Stream<HoodieInstant> getInstantsToArchive() {
+    // TODO: Handle ROLLBACK_ACTION in future
+    Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
 
     // For archiving and cleaning instants, we need to include intermediate state files if they exist
     HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
@@ -243,7 +254,7 @@ public class HoodieTimelineArchiveLog {
 
     List<HoodieInstant> instantsToBeDeleted =
         instants.stream().filter(instant1 -> HoodieTimeline.compareTimestamps(instant1.getTimestamp(),
-            HoodieTimeline.LESSER_THAN_OR_EQUALS, thresholdInstant.getTimestamp())).collect(Collectors.toList());
+            LESSER_THAN_OR_EQUALS, thresholdInstant.getTimestamp())).collect(Collectors.toList());
 
     for (HoodieInstant deleteInstant : instantsToBeDeleted) {
       LOG.info("Deleting instant " + deleteInstant + " in auxiliary meta path " + metaClient.getMetaAuxiliaryPath());
@@ -264,6 +275,7 @@ public class HoodieTimelineArchiveLog {
       List<IndexedRecord> records = new ArrayList<>();
       for (HoodieInstant hoodieInstant : instants) {
         try {
+          deleteAnyLeftOverMarkerFiles(hoodieInstant);
           records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
           if (records.size() >= this.config.getCommitArchivalBatchSize()) {
             writeToFile(wrapperSchema, records);
@@ -281,8 +293,11 @@ public class HoodieTimelineArchiveLog {
     }
   }
 
-  public Path getArchiveFilePath() {
-    return archiveFilePath;
+  private void deleteAnyLeftOverMarkerFiles(HoodieInstant instant) {
+    MarkerFiles markerFiles = new MarkerFiles(table, instant.getTimestamp());
+    if (markerFiles.deleteMarkerDir()) {
+      LOG.info("Cleaned up left over marker directory for instant :" + instant);
+    }
   }
 
   private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) throws Exception {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
new file mode 100644
index 0000000..00eb7df
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.IOType;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Operates on marker files for a given write action (commit, delta commit, compaction).
+ */
+public class MarkerFiles {
+
+  private static final Logger LOG = LogManager.getLogger(MarkerFiles.class);
+
+  public static String stripMarkerSuffix(String path) {
+    return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN));
+  }
+
+  private final String instantTime;
+  private final FileSystem fs;
+  private final Path markerDirPath;
+  private final String basePath;
+
+  public MarkerFiles(FileSystem fs, String basePath, String markerFolderPath, String instantTime) {
+    this.instantTime = instantTime;
+    this.fs = fs;
+    this.markerDirPath = new Path(markerFolderPath);
+    this.basePath = basePath;
+  }
+
+  public MarkerFiles(HoodieTable<?> table, String instantTime) {
+    this(table.getMetaClient().getFs(),
+        table.getMetaClient().getBasePath(),
+        table.getMetaClient().getMarkerFolderPath(instantTime),
+        instantTime);
+  }
+
+  public void quietDeleteMarkerDir() {
+    try {
+      deleteMarkerDir();
+    } catch (HoodieIOException ioe) {
+      LOG.warn("Error deleting marker directory for instant " + instantTime, ioe);
+    }
+  }
+
+  /**
+   * Delete Marker directory corresponding to an instant.
+   */
+  public boolean deleteMarkerDir() {
+    try {
+      boolean result = fs.delete(markerDirPath, true);
+      if (result) {
+        LOG.info("Removing marker directory at " + markerDirPath);
+      } else {
+        LOG.info("No marker directory to delete at " + markerDirPath);
+      }
+      return result;
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  public boolean doesMarkerDirExist() throws IOException {
+    return fs.exists(markerDirPath);
+  }
+
+  public List<String> createdAndMergedDataPaths() throws IOException {
+    List<String> dataFiles = new LinkedList<>();
+    FSUtils.processFiles(fs, markerDirPath.toString(), (status) -> {
+      String pathStr = status.getPath().toString();
+      if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {
+        dataFiles.add(translateMarkerToDataPath(pathStr));
+      }
+      return true;
+    }, false);
+    return dataFiles;
+  }
+
+  private String translateMarkerToDataPath(String markerPath) {
+    String rPath = stripMarkerFolderPrefix(markerPath);
+    return MarkerFiles.stripMarkerSuffix(rPath);
+  }
+
+  public List<String> allMarkerFilePaths() throws IOException {
+    List<String> markerFiles = new ArrayList<>();
+    FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> {
+      markerFiles.add(stripMarkerFolderPrefix(fileStatus.getPath().toString()));
+      return true;
+    }, false);
+    return markerFiles;
+  }
+
+  private String stripMarkerFolderPrefix(String fullMarkerPath) {
+    ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN));
+    String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
+        new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString();
+    int begin = fullMarkerPath.indexOf(markerRootPath);
+    ValidationUtils.checkArgument(begin >= 0,
+        "Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected Marker Root=" + markerRootPath);
+    return fullMarkerPath.substring(begin + markerRootPath.length() + 1);
+  }
+
+  /**
+   * The marker path will be <base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
+   */
+  public Path create(String partitionPath, String dataFileName, IOType type) {
+    Path path = FSUtils.getPartitionPath(markerDirPath, partitionPath);
+    try {
+      fs.mkdirs(path); // create a new partition as needed.
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to make dir " + path, e);
+    }
+    String markerFileName = String.format("%s%s.%s", dataFileName, HoodieTableMetaClient.MARKER_EXTN, type.name());
+    Path markerPath = new Path(path, markerFileName);
+    try {
+      LOG.info("Creating Marker Path=" + markerPath);
+      fs.create(markerPath, false).close();
+    } catch (IOException e) {
+      throw new HoodieException("Failed to create marker file " + markerPath, e);
+    }
+    return markerPath;
+  }
+
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
index 53e94c6..2c9c2bc 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
@@ -48,7 +48,8 @@ public class CopyOnWriteRestoreActionExecutor extends BaseRestoreActionExecutor
         HoodieActiveTimeline.createNewInstantTime(),
         instantToRollback,
         true,
-        true);
+        true,
+        false);
     if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
       throw new HoodieRollbackException("Unsupported action in rollback instant:" + instantToRollback);
     }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
index 8040861..827d8e2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
@@ -47,7 +47,8 @@ public class MergeOnReadRestoreActionExecutor extends BaseRestoreActionExecutor
         HoodieActiveTimeline.createNewInstantTime(),
         instantToRollback,
         true,
-        true);
+        true,
+        false);
 
     switch (instantToRollback.getAction()) {
       case HoodieTimeline.COMMIT_ACTION:
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index bb12bdd..846e8a8 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -27,16 +27,19 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
 import org.apache.hudi.table.action.BaseActionExecutor;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -46,9 +49,14 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor<Hood
 
   private static final Logger LOG = LogManager.getLogger(BaseRollbackActionExecutor.class);
 
+  interface RollbackStrategy extends Serializable {
+    List<HoodieRollbackStat> execute(HoodieInstant instantToRollback);
+  }
+
   protected final HoodieInstant instantToRollback;
   protected final boolean deleteInstants;
   protected final boolean skipTimelinePublish;
+  protected final boolean useMarkerBasedStrategy;
 
   public BaseRollbackActionExecutor(JavaSparkContext jsc,
                                     HoodieWriteConfig config,
@@ -56,7 +64,8 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor<Hood
                                     String instantTime,
                                     HoodieInstant instantToRollback,
                                     boolean deleteInstants) {
-    this(jsc, config, table, instantTime, instantToRollback, deleteInstants, false);
+    this(jsc, config, table, instantTime, instantToRollback, deleteInstants,
+        false, config.shouldRollbackUsingMarkers());
   }
 
   public BaseRollbackActionExecutor(JavaSparkContext jsc,
@@ -65,76 +74,105 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor<Hood
                                     String instantTime,
                                     HoodieInstant instantToRollback,
                                     boolean deleteInstants,
-                                    boolean skipTimelinePublish) {
+                                    boolean skipTimelinePublish,
+                                    boolean useMarkerBasedStrategy) {
     super(jsc, config, table, instantTime);
     this.instantToRollback = instantToRollback;
     this.deleteInstants = deleteInstants;
     this.skipTimelinePublish = skipTimelinePublish;
+    this.useMarkerBasedStrategy = useMarkerBasedStrategy;
+    if (useMarkerBasedStrategy) {
+      ValidationUtils.checkArgument(!instantToRollback.isCompleted(),
+              "Cannot use marker based rollback strategy on completed instant:" + instantToRollback);
+    }
+  }
+
+  protected RollbackStrategy getRollbackStrategy() {
+    if (useMarkerBasedStrategy) {
+      return new MarkerBasedRollbackStrategy(table, jsc, config, instantTime);
+    } else {
+      return this::executeRollbackUsingFileListing;
+    }
   }
 
   protected abstract List<HoodieRollbackStat> executeRollback() throws IOException;
 
+  protected abstract List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback);
+
   @Override
   public HoodieRollbackMetadata execute() {
     HoodieTimer rollbackTimer = new HoodieTimer().startTimer();
+    List<HoodieRollbackStat> stats = doRollbackAndGetStats();
     HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.convertRollbackMetadata(
         instantTime,
         Option.of(rollbackTimer.endTimer()),
         Collections.singletonList(instantToRollback.getTimestamp()),
-        doRollbackAndGetStats());
+        stats);
     if (!skipTimelinePublish) {
       finishRollback(rollbackMetadata);
     }
+
+    // Finally, remove the marker files post rollback.
+    new MarkerFiles(table, instantToRollback.getTimestamp()).quietDeleteMarkerDir();
+
     return rollbackMetadata;
   }
 
-  public List<HoodieRollbackStat> doRollbackAndGetStats() {
-    final String instantTimeToRollback = instantToRollback.getTimestamp();
-    final boolean isPendingCompaction = Objects.equals(HoodieTimeline.COMPACTION_ACTION, instantToRollback.getAction())
-        && !instantToRollback.isCompleted();
-    HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline();
-    HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
+  private void validateSavepointRollbacks() {
     // Check if any of the commits is a savepoint - do not allow rollback on those commits
     List<String> savepoints = table.getCompletedSavepointTimeline().getInstants()
         .map(HoodieInstant::getTimestamp)
         .collect(Collectors.toList());
     savepoints.forEach(s -> {
-      if (s.contains(instantTimeToRollback)) {
+      if (s.contains(instantToRollback.getTimestamp())) {
         throw new HoodieRollbackException(
             "Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s);
       }
     });
+  }
 
-    if (commitTimeline.empty() && inflightAndRequestedCommitTimeline.empty()) {
-      LOG.info("No commits to rollback " + instantTimeToRollback);
-    }
-
+  private void validateRollbackCommitSequence() {
+    final String instantTimeToRollback = instantToRollback.getTimestamp();
+    HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
+    HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline();
     // Make sure only the last n commits are being rolled back
     // If there is a commit in-between or after that is not rolled back, then abort
-    if (!isPendingCompaction) {
-      if ((instantTimeToRollback != null) && !commitTimeline.empty()
-          && !commitTimeline.findInstantsAfter(instantTimeToRollback, Integer.MAX_VALUE).empty()) {
-        throw new HoodieRollbackException(
-            "Found commits after time :" + instantTimeToRollback + ", please rollback greater commits first");
-      }
+    if ((instantTimeToRollback != null) && !commitTimeline.empty()
+        && !commitTimeline.findInstantsAfter(instantTimeToRollback, Integer.MAX_VALUE).empty()) {
+      throw new HoodieRollbackException(
+          "Found commits after time :" + instantTimeToRollback + ", please rollback greater commits first");
+    }
 
-      List<String> inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
-          .collect(Collectors.toList());
-      if ((instantTimeToRollback != null) && !inflights.isEmpty()
-          && (inflights.indexOf(instantTimeToRollback) != inflights.size() - 1)) {
-        throw new HoodieRollbackException(
-            "Found in-flight commits after time :" + instantTimeToRollback + ", please rollback greater commits first");
-      }
+    List<String> inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
+        .collect(Collectors.toList());
+    if ((instantTimeToRollback != null) && !inflights.isEmpty()
+        && (inflights.indexOf(instantTimeToRollback) != inflights.size() - 1)) {
+      throw new HoodieRollbackException(
+          "Found in-flight commits after time :" + instantTimeToRollback + ", please rollback greater commits first");
+    }
+  }
+
+  private void rollBackIndex() {
+    if (!table.getIndex().rollbackCommit(instantToRollback.getTimestamp())) {
+      throw new HoodieRollbackException("Rollback index changes failed, for time :" + instantToRollback);
+    }
+    LOG.info("Index rolled back for commits " + instantToRollback);
+  }
+
+  public List<HoodieRollbackStat> doRollbackAndGetStats() {
+    final String instantTimeToRollback = instantToRollback.getTimestamp();
+    final boolean isPendingCompaction = Objects.equals(HoodieTimeline.COMPACTION_ACTION, instantToRollback.getAction())
+        && !instantToRollback.isCompleted();
+    validateSavepointRollbacks();
+    if (!isPendingCompaction) {
+      validateRollbackCommitSequence();
     }
 
     try {
       List<HoodieRollbackStat> stats = executeRollback();
       LOG.info("Rolled back inflight instant " + instantTimeToRollback);
       if (!isPendingCompaction) {
-        if (!table.getIndex().rollbackCommit(instantTimeToRollback)) {
-          throw new HoodieRollbackException("Rollback index changes failed, for time :" + instantTimeToRollback);
-        }
-        LOG.info("Index rolled back for commits " + instantTimeToRollback);
+        rollBackIndex();
       }
       return stats;
     } catch (IOException e) {
@@ -171,9 +209,6 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor<Hood
   protected void deleteInflightAndRequestedInstant(boolean deleteInstant,
                                                    HoodieActiveTimeline activeTimeline,
                                                    HoodieInstant instantToBeDeleted) {
-    // Remove marker files always on rollback
-    table.deleteMarkerDir(instantToBeDeleted.getTimestamp());
-
     // Remove the rolled back inflight commits
     if (deleteInstant) {
       LOG.info("Deleting instant=" + instantToBeDeleted);
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
index 3d6c123..f2b6978 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
@@ -22,7 +22,9 @@ import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -52,13 +54,16 @@ public class CopyOnWriteRollbackActionExecutor extends BaseRollbackActionExecuto
                                            String instantTime,
                                            HoodieInstant commitInstant,
                                            boolean deleteInstants,
-                                           boolean skipTimelinePublish) {
-    super(jsc, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish);
+                                           boolean skipTimelinePublish,
+                                           boolean useMarkerBasedStrategy) {
+    super(jsc, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
   }
 
   @Override
-  protected List<HoodieRollbackStat> executeRollback() throws IOException {
-    long startTime = System.currentTimeMillis();
+  protected List<HoodieRollbackStat> executeRollback() {
+    HoodieTimer rollbackTimer = new HoodieTimer();
+    rollbackTimer.startTimer();
+
     List<HoodieRollbackStat> stats = new ArrayList<>();
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
     HoodieInstant resolvedInstant = instantToRollback;
@@ -74,23 +79,29 @@ public class CopyOnWriteRollbackActionExecutor extends BaseRollbackActionExecuto
     // deleting the timeline file
     if (!resolvedInstant.isRequested()) {
       // delete all the data files for this commit
-      LOG.info("Clean out all parquet files generated for commit: " + resolvedInstant);
-      List<RollbackRequest> rollbackRequests = generateRollbackRequests(resolvedInstant);
-
-      //TODO: We need to persist this as rollback workload and use it in case of partial failures
-      stats = new RollbackHelper(table.getMetaClient(), config).performRollback(jsc, resolvedInstant, rollbackRequests);
+      LOG.info("Clean out all base files generated for commit: " + resolvedInstant);
+      stats = getRollbackStrategy().execute(resolvedInstant);
     }
     // Delete Inflight instant if enabled
     deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, resolvedInstant);
-    LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
+    LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());
     return stats;
   }
 
-  private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback)
-      throws IOException {
-    return FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
-        config.shouldAssumeDatePartitioning()).stream()
-        .map(partitionPath -> RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback))
-        .collect(Collectors.toList());
+  private List<ListingBasedRollbackRequest> generateRollbackRequestsByListing() {
+    try {
+      return FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
+          config.shouldAssumeDatePartitioning()).stream()
+          .map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
+          .collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new HoodieIOException("Error generating rollback requests", e);
+    }
+  }
+
+  @Override
+  protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
+    List<ListingBasedRollbackRequest> rollbackRequests = generateRollbackRequestsByListing();
+    return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(jsc, instantToRollback, rollbackRequests);
   }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
similarity index 71%
rename from hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
index f5b92e7..d7304fa 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
@@ -28,9 +28,8 @@ import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -42,8 +41,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.io.UncheckedIOException;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -54,14 +52,14 @@ import scala.Tuple2;
 /**
  * Performs Rollback of Hoodie Tables.
  */
-public class RollbackHelper implements Serializable {
+public class ListingBasedRollbackHelper implements Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(RollbackHelper.class);
+  private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
 
   private final HoodieTableMetaClient metaClient;
   private final HoodieWriteConfig config;
 
-  public RollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+  public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
     this.metaClient = metaClient;
     this.config = config;
   }
@@ -69,14 +67,12 @@ public class RollbackHelper implements Serializable {
   /**
    * Performs all rollback actions that we have collected in parallel.
    */
-  public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback, List<RollbackRequest> rollbackRequests) {
-
-    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+  public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
     SerializablePathFilter filter = (path) -> {
-      if (path.toString().contains(basefileExtension)) {
+      if (path.toString().endsWith(this.metaClient.getTableConfig().getBaseFileFormat().getFileExtension())) {
         String fileCommitTime = FSUtils.getCommitTime(path.getName());
         return instantToRollback.getTimestamp().equals(fileCommitTime);
-      } else if (path.toString().contains(".log")) {
+      } else if (FSUtils.isLogFile(path)) {
         // Since the baseCommitTime is the only commit for new log files, it's okay here
         String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
         return instantToRollback.getTimestamp().equals(fileCommitTime);
@@ -87,17 +83,16 @@ public class RollbackHelper implements Serializable {
     int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
     jsc.setJobGroup(this.getClass().getSimpleName(), "Perform rollback actions");
     return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> {
-      final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
-      switch (rollbackRequest.getRollbackAction()) {
+      switch (rollbackRequest.getType()) {
         case DELETE_DATA_FILES_ONLY: {
-          deleteCleanedFiles(metaClient, config, filesToDeletedStatus, instantToRollback.getTimestamp(),
+          final Map<FileStatus, Boolean> filesToDeletedStatus = deleteCleanedFiles(metaClient, config, instantToRollback.getTimestamp(),
               rollbackRequest.getPartitionPath());
           return new Tuple2<>(rollbackRequest.getPartitionPath(),
                   HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
                           .withDeletedFileResults(filesToDeletedStatus).build());
         }
         case DELETE_DATA_AND_LOG_FILES: {
-          deleteCleanedFiles(metaClient, config, filesToDeletedStatus, rollbackRequest.getPartitionPath(), filter);
+          final Map<FileStatus, Boolean> filesToDeletedStatus = deleteCleanedFiles(metaClient, config, rollbackRequest.getPartitionPath(), filter);
           return new Tuple2<>(rollbackRequest.getPartitionPath(),
                   HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
                           .withDeletedFileResults(filesToDeletedStatus).build());
@@ -123,15 +118,17 @@ public class RollbackHelper implements Serializable {
                 writer.close();
               }
             } catch (IOException io) {
-              throw new UncheckedIOException(io);
+              throw new HoodieIOException("Error appending rollback block..", io);
             }
           }
 
           // This step is intentionally done after writer is closed. Guarantees that
           // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
           // cloud-storage : HUDI-168
-          Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
-          filesToNumBlocksRollback.put(metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), 1L);
+          Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
+              metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
+              1L
+          );
           return new Tuple2<>(rollbackRequest.getPartitionPath(),
                   HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
                           .withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
@@ -139,36 +136,18 @@ public class RollbackHelper implements Serializable {
         default:
           throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
       }
-    }).reduceByKey(this::mergeRollbackStat).map(Tuple2::_2).collect();
+    }).reduceByKey(RollbackUtils::mergeRollbackStat).map(Tuple2::_2).collect();
   }
 
-  /**
-   * Helper to merge 2 rollback-stats for a given partition.
-   *
-   * @param stat1 HoodieRollbackStat
-   * @param stat2 HoodieRollbackStat
-   * @return Merged HoodieRollbackStat
-   */
-  private HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRollbackStat stat2) {
-    ValidationUtils.checkArgument(stat1.getPartitionPath().equals(stat2.getPartitionPath()));
-    final List<String> successDeleteFiles = new ArrayList<>();
-    final List<String> failedDeleteFiles = new ArrayList<>();
-    final Map<FileStatus, Long> commandBlocksCount = new HashMap<>();
-    Option.ofNullable(stat1.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll);
-    Option.ofNullable(stat2.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll);
-    Option.ofNullable(stat1.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll);
-    Option.ofNullable(stat2.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll);
-    Option.ofNullable(stat1.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll);
-    Option.ofNullable(stat2.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll);
-    return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount);
-  }
+
 
   /**
-   * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits.
+   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
    */
   private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-      Map<FileStatus, Boolean> results, String partitionPath, PathFilter filter) throws IOException {
+                                                      String partitionPath, PathFilter filter) throws IOException {
     LOG.info("Cleaning path " + partitionPath);
+    final Map<FileStatus, Boolean> results = new HashMap<>();
     FileSystem fs = metaClient.getFs();
     FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
     for (FileStatus file : toBeDeleted) {
@@ -180,10 +159,11 @@ public class RollbackHelper implements Serializable {
   }
 
   /**
-   * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits.
+   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
    */
   private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-      Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException {
+                                                      String commit, String partitionPath) throws IOException {
+    final Map<FileStatus, Boolean> results = new HashMap<>();
     LOG.info("Cleaning path " + partitionPath);
     FileSystem fs = metaClient.getFs();
     String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
@@ -214,6 +194,5 @@ public class RollbackHelper implements Serializable {
   }
 
   public interface SerializablePathFilter extends PathFilter, Serializable {
-
   }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackRequest.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackRequest.java
similarity index 51%
rename from hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackRequest.java
rename to hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackRequest.java
index 71cb57d..fc369a4 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackRequest.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackRequest.java
@@ -18,18 +18,17 @@
 
 package org.apache.hudi.table.action.rollback;
 
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 
 /**
  * Request for performing one rollback action.
  */
-public class RollbackRequest {
+public class ListingBasedRollbackRequest {
 
   /**
-   * Rollback Action Types.
+   * Rollback commands, that trigger a specific handling for rollback.
    */
-  public enum RollbackAction {
+  public enum Type {
     DELETE_DATA_FILES_ONLY,
     DELETE_DATA_AND_LOG_FILES,
     APPEND_ROLLBACK_BLOCK
@@ -41,11 +40,6 @@ public class RollbackRequest {
   private final String partitionPath;
 
   /**
-   * Rollback Instant.
-   */
-  private final HoodieInstant rollbackInstant;
-
-  /**
    * FileId in case of appending rollback block.
    */
   private final Option<String> fileId;
@@ -55,46 +49,38 @@ public class RollbackRequest {
    */
   private final Option<String> latestBaseInstant;
 
-  /**
-   * Rollback Action.
-   */
-  private final RollbackAction rollbackAction;
+  private final Type type;
 
-  public RollbackRequest(String partitionPath, HoodieInstant rollbackInstant, Option<String> fileId,
-      Option<String> latestBaseInstant, RollbackAction rollbackAction) {
+  public ListingBasedRollbackRequest(String partitionPath,
+                                     Option<String> fileId,
+                                     Option<String> latestBaseInstant,
+                                     Type type) {
     this.partitionPath = partitionPath;
-    this.rollbackInstant = rollbackInstant;
     this.fileId = fileId;
     this.latestBaseInstant = latestBaseInstant;
-    this.rollbackAction = rollbackAction;
+    this.type = type;
   }
 
-  public static RollbackRequest createRollbackRequestWithDeleteDataFilesOnlyAction(String partitionPath,
-      HoodieInstant rollbackInstant) {
-    return new RollbackRequest(partitionPath, rollbackInstant, Option.empty(), Option.empty(),
-        RollbackAction.DELETE_DATA_FILES_ONLY);
+  public static ListingBasedRollbackRequest createRollbackRequestWithDeleteDataFilesOnlyAction(String partitionPath) {
+    return new ListingBasedRollbackRequest(partitionPath, Option.empty(), Option.empty(),
+        Type.DELETE_DATA_FILES_ONLY);
   }
 
-  public static RollbackRequest createRollbackRequestWithDeleteDataAndLogFilesAction(String partitionPath,
-      HoodieInstant rollbackInstant) {
-    return new RollbackRequest(partitionPath, rollbackInstant, Option.empty(), Option.empty(),
-        RollbackAction.DELETE_DATA_AND_LOG_FILES);
+  public static ListingBasedRollbackRequest createRollbackRequestWithDeleteDataAndLogFilesAction(String partitionPath) {
+    return new ListingBasedRollbackRequest(partitionPath, Option.empty(), Option.empty(),
+        Type.DELETE_DATA_AND_LOG_FILES);
   }
 
-  public static RollbackRequest createRollbackRequestWithAppendRollbackBlockAction(String partitionPath, String fileId,
-      String baseInstant, HoodieInstant rollbackInstant) {
-    return new RollbackRequest(partitionPath, rollbackInstant, Option.of(fileId), Option.of(baseInstant),
-        RollbackAction.APPEND_ROLLBACK_BLOCK);
+  public static ListingBasedRollbackRequest createRollbackRequestWithAppendRollbackBlockAction(String partitionPath, String fileId,
+                                                                                               String baseInstant) {
+    return new ListingBasedRollbackRequest(partitionPath, Option.of(fileId), Option.of(baseInstant),
+        Type.APPEND_ROLLBACK_BLOCK);
   }
 
   public String getPartitionPath() {
     return partitionPath;
   }
 
-  public HoodieInstant getRollbackInstant() {
-    return rollbackInstant;
-  }
-
   public Option<String> getFileId() {
     return fileId;
   }
@@ -103,7 +89,7 @@ public class RollbackRequest {
     return latestBaseInstant;
   }
 
-  public RollbackAction getRollbackAction() {
-    return rollbackAction;
+  public Type getType() {
+    return type;
   }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
new file mode 100644
index 0000000..40b81a2
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.io.IOType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Performs rollback using marker files generated during the write..
+ */
+public class MarkerBasedRollbackStrategy implements BaseRollbackActionExecutor.RollbackStrategy {
+
+  private static final Logger LOG = LogManager.getLogger(MarkerBasedRollbackStrategy.class);
+
+  private final HoodieTable<?> table;
+
+  private final transient JavaSparkContext jsc;
+
+  private final HoodieWriteConfig config;
+
+  private final String basePath;
+
+  private final String instantTime;
+
+  public MarkerBasedRollbackStrategy(HoodieTable<?> table, JavaSparkContext jsc, HoodieWriteConfig config, String instantTime) {
+    this.table = table;
+    this.jsc = jsc;
+    this.basePath = table.getMetaClient().getBasePath();
+    this.config = config;
+    this.instantTime = instantTime;
+  }
+
+  private HoodieRollbackStat undoMerge(String mergedBaseFilePath) throws IOException {
+    LOG.info("Rolling back by deleting the merged base file:" + mergedBaseFilePath);
+    return deleteBaseFile(mergedBaseFilePath);
+  }
+
+  private HoodieRollbackStat undoCreate(String createdBaseFilePath) throws IOException {
+    LOG.info("Rolling back by deleting the created base file:" + createdBaseFilePath);
+    return deleteBaseFile(createdBaseFilePath);
+  }
+
+  private HoodieRollbackStat deleteBaseFile(String baseFilePath) throws IOException {
+    Path fullDeletePath = new Path(basePath, baseFilePath);
+    String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent());
+    boolean isDeleted = table.getMetaClient().getFs().delete(fullDeletePath);
+    return HoodieRollbackStat.newBuilder()
+        .withPartitionPath(partitionPath)
+        .withDeletedFileResult(baseFilePath, isDeleted)
+        .build();
+  }
+
+  private HoodieRollbackStat undoAppend(String appendBaseFilePath, HoodieInstant instantToRollback) throws IOException, InterruptedException {
+    Path baseFilePathForAppend = new Path(basePath, appendBaseFilePath);
+    String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
+    String baseCommitTime = FSUtils.getCommitTime(baseFilePathForAppend.getName());
+    String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), new Path(basePath, appendBaseFilePath).getParent());
+
+    HoodieLogFormat.Writer writer = null;
+    try {
+      Path partitionFullPath = FSUtils.getPartitionPath(basePath, partitionPath);
+
+      if (!table.getMetaClient().getFs().exists(partitionFullPath)) {
+        return HoodieRollbackStat.newBuilder()
+            .withPartitionPath(partitionPath)
+            .build();
+      }
+      writer = HoodieLogFormat.newWriterBuilder()
+          .onParentPath(partitionFullPath)
+          .withFileId(fileId)
+          .overBaseCommit(baseCommitTime)
+          .withFs(table.getMetaClient().getFs())
+          .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+
+      // generate metadata
+      Map<HoodieLogBlock.HeaderMetadataType, String> header = RollbackUtils.generateHeader(instantToRollback.getTimestamp(), instantTime);
+      // if update belongs to an existing log file
+      writer = writer.appendBlock(new HoodieCommandBlock(header));
+    } finally {
+      try {
+        if (writer != null) {
+          writer.close();
+        }
+      } catch (IOException io) {
+        throw new HoodieIOException("Error closing append of rollback block..", io);
+      }
+    }
+
+    return HoodieRollbackStat.newBuilder()
+        .withPartitionPath(partitionPath)
+        // we don't use this field per se. Avoiding the extra file status call.
+        .withRollbackBlockAppendResults(Collections.emptyMap())
+        .build();
+  }
+
+  @Override
+  public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
+    try {
+      MarkerFiles markerFiles = new MarkerFiles(table, instantToRollback.getTimestamp());
+      List<String> markerFilePaths = markerFiles.allMarkerFilePaths();
+      int parallelism = Math.max(Math.min(markerFilePaths.size(), config.getRollbackParallelism()), 1);
+      return jsc.parallelize(markerFilePaths, parallelism)
+          .map(markerFilePath -> {
+            String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
+            IOType type = IOType.valueOf(typeStr);
+            switch (type) {
+              case MERGE:
+                return undoMerge(MarkerFiles.stripMarkerSuffix(markerFilePath));
+              case APPEND:
+                return undoAppend(MarkerFiles.stripMarkerSuffix(markerFilePath), instantToRollback);
+              case CREATE:
+                return undoCreate(MarkerFiles.stripMarkerSuffix(markerFilePath));
+              default:
+                throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
+            }
+          })
+          .mapToPair(rollbackStat -> new Tuple2<>(rollbackStat.getPartitionPath(), rollbackStat))
+          .reduceByKey(RollbackUtils::mergeRollbackStat)
+          .map(Tuple2::_2).collect();
+    } catch (Exception e) {
+      throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
+    }
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
index 481fb2d..d11b7ad 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
@@ -26,15 +26,16 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -60,14 +61,17 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
                                            String instantTime,
                                            HoodieInstant commitInstant,
                                            boolean deleteInstants,
-                                           boolean skipTimelinePublish) {
-    super(jsc, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish);
+                                           boolean skipTimelinePublish,
+                                           boolean useMarkerBasedStrategy) {
+    super(jsc, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
   }
 
   @Override
   protected List<HoodieRollbackStat> executeRollback() throws IOException {
-    long startTime = System.currentTimeMillis();
-    LOG.info("Rolling back instant " + instantToRollback.getTimestamp());
+    HoodieTimer rollbackTimer = new HoodieTimer();
+    rollbackTimer.startTimer();
+
+    LOG.info("Rolling back instant " + instantToRollback);
 
     HoodieInstant resolvedInstant = instantToRollback;
     // Atomically un-publish all non-inflight commits
@@ -85,35 +89,40 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
     // (commitToRollback).
     // NOTE {@link HoodieCompactionConfig#withCompactionLazyBlockReadEnabled} needs to be set to TRUE. This is
     // required to avoid OOM when merging multiple LogBlocks performed during nested rollbacks.
-    // Atomically un-publish all non-inflight commits
+
+
     // For Requested State (like failure during index lookup), there is nothing to do rollback other than
     // deleting the timeline file
     if (!resolvedInstant.isRequested()) {
-      LOG.info("Un-published " + resolvedInstant);
-      List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, resolvedInstant);
-      // TODO: We need to persist this as rollback workload and use it in case of partial failures
-      allRollbackStats = new RollbackHelper(table.getMetaClient(), config).performRollback(jsc, resolvedInstant, rollbackRequests);
+      LOG.info("Unpublished " + resolvedInstant);
+      allRollbackStats = getRollbackStrategy().execute(resolvedInstant);
     }
 
     // Delete Inflight instants if enabled
     deleteInflightAndRequestedInstant(deleteInstants, table.getActiveTimeline(), resolvedInstant);
-
-    LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
-
+    LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());
     return allRollbackStats;
   }
 
+  @Override
+  protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant resolvedInstant) {
+    List<ListingBasedRollbackRequest> rollbackRequests;
+    try {
+      rollbackRequests = generateRollbackRequestsUsingFileListing(resolvedInstant);
+    } catch (IOException e) {
+      throw new HoodieIOException("Error generating rollback requests by file listing.", e);
+    }
+    return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(jsc, resolvedInstant, rollbackRequests);
+  }
+
   /**
    * Generate all rollback requests that we need to perform for rolling back this action without actually performing
    * rolling back.
    *
-   * @param jsc JavaSparkContext
    * @param instantToRollback Instant to Rollback
    * @return list of rollback requests
-   * @throws IOException
    */
-  private List<RollbackRequest> generateRollbackRequests(JavaSparkContext jsc, HoodieInstant instantToRollback)
-      throws IOException {
+  private List<ListingBasedRollbackRequest> generateRollbackRequestsUsingFileListing(HoodieInstant instantToRollback) throws IOException {
     String commit = instantToRollback.getTimestamp();
     List<String> partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
         config.shouldAssumeDatePartitioning());
@@ -121,12 +130,12 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
     jsc.setJobGroup(this.getClass().getSimpleName(), "Generate all rollback requests");
     return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> {
       HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline();
-      List<RollbackRequest> partitionRollbackRequests = new ArrayList<>();
+      List<ListingBasedRollbackRequest> partitionRollbackRequests = new ArrayList<>();
       switch (instantToRollback.getAction()) {
         case HoodieTimeline.COMMIT_ACTION:
-          LOG.info("Rolling back commit action. There are higher delta commits. So only rolling back this instant");
+          LOG.info("Rolling back commit action.");
           partitionRollbackRequests.add(
-              RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
+              ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
           break;
         case HoodieTimeline.COMPACTION_ACTION:
           // If there is no delta commit present after the current commit (if compaction), no action, else we
@@ -141,7 +150,7 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
             // have been written to the log files.
             LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files");
             partitionRollbackRequests.add(
-                RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath, instantToRollback));
+                ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath));
           } else {
             // No deltacommits present after this compaction commit (inflight or requested). In this case, we
             // can also delete any log files that were created with this compaction commit as base
@@ -149,7 +158,7 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
             LOG.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and"
                 + " log files");
             partitionRollbackRequests.add(
-                RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
+                ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
           }
           break;
         case HoodieTimeline.DELTA_COMMIT_ACTION:
@@ -179,8 +188,7 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
           try {
             HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
                 table.getMetaClient().getCommitTimeline()
-                    .getInstantDetails(
-                        new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()))
+                    .getInstantDetails(new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()))
                     .get(),
                 HoodieCommitMetadata.class);
 
@@ -188,7 +196,7 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
             // We do not know fileIds for inserts (first inserts are either log files or parquet files),
             // delete all files for the corresponding failed commit, if present (same as COW)
             partitionRollbackRequests.add(
-                RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
+                ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
 
             // append rollback blocks for updates
             if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
@@ -197,7 +205,7 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
             }
             break;
           } catch (IOException io) {
-            throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io);
+            throw new HoodieIOException("Failed to collect rollback actions for commit " + commit, io);
           }
         default:
           break;
@@ -206,8 +214,8 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
     }).filter(Objects::nonNull).collect();
   }
 
-  private List<RollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant,
-                                                                   HoodieCommitMetadata commitMetadata) {
+  private List<ListingBasedRollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant,
+                                                                               HoodieCommitMetadata commitMetadata) {
     ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
 
     // wStat.getPrevCommit() might not give the right commit time in the following
@@ -236,8 +244,8 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
           wStat.getFileId()), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp());
     }).map(wStat -> {
       String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
-      return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(),
-          baseCommitTime, rollbackInstant);
+      return ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(),
+          baseCommitTime);
     }).collect(Collectors.toList());
   }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
new file mode 100644
index 0000000..cbb5a2c
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RollbackUtils {
+
+  static Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String instantToRollback, String rollbackInstantTime) {
+    // generate metadata
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(3);
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, rollbackInstantTime);
+    header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, instantToRollback);
+    header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
+        String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
+    return header;
+  }
+
+  /**
+   * Helper to merge 2 rollback-stats for a given partition.
+   *
+   * @param stat1 HoodieRollbackStat
+   * @param stat2 HoodieRollbackStat
+   * @return Merged HoodieRollbackStat
+   */
+  static HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRollbackStat stat2) {
+    ValidationUtils.checkArgument(stat1.getPartitionPath().equals(stat2.getPartitionPath()));
+    final List<String> successDeleteFiles = new ArrayList<>();
+    final List<String> failedDeleteFiles = new ArrayList<>();
+    final Map<FileStatus, Long> commandBlocksCount = new HashMap<>();
+    Option.ofNullable(stat1.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll);
+    Option.ofNullable(stat2.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll);
+    Option.ofNullable(stat1.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll);
+    Option.ofNullable(stat2.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll);
+    Option.ofNullable(stat1.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll);
+    Option.ofNullable(stat2.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll);
+    return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount);
+  }
+
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 5518a3f..e9a70a8 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -44,7 +44,9 @@ import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndex.IndexType;
+import org.apache.hudi.io.IOType;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
 import org.apache.hudi.table.action.commit.WriteHelper;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
@@ -1063,11 +1065,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
   }
 
-  @Test
-  public void testRollbackAfterConsistencyCheckFailure() throws Exception {
+  private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers) throws Exception {
     String instantTime = "000";
     HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
-    HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
+    HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false).build();
     HoodieWriteClient client = getHoodieWriteClient(cfg);
     testConsistencyCheck(metaClient, instantTime);
 
@@ -1079,6 +1080,16 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
   }
 
+  @Test
+  public void testRollbackAfterConsistencyCheckFailureUsingFileList() throws Exception {
+    testRollbackAfterConsistencyCheckFailureUsingFileList(false);
+  }
+
+  @Test
+  public void testRollbackAfterConsistencyCheckFailureUsingMarkers() throws Exception {
+    testRollbackAfterConsistencyCheckFailureUsingFileList(true);
+  }
+
   private Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime)
       throws Exception {
     HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false)
@@ -1096,11 +1107,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     // This should fail the commit
     String partitionPath = Arrays
         .stream(fs.globStatus(new Path(String.format("%s/*/*/*/*", metaClient.getMarkerFolderPath(instantTime))),
-            path -> path.toString().endsWith(HoodieTableMetaClient.MARKER_EXTN)))
+            path -> path.toString().contains(HoodieTableMetaClient.MARKER_EXTN)))
         .limit(1).map(status -> status.getPath().getParent().toString()).collect(Collectors.toList()).get(0);
-    Path markerFilePath = new Path(String.format("%s/%s", partitionPath,
-        FSUtils.makeMarkerFile(instantTime, "1-0-1", UUID.randomUUID().toString())));
-    metaClient.getFs().create(markerFilePath);
+
+    Path markerFilePath = new MarkerFiles(fs, basePath, metaClient.getMarkerFolderPath(instantTime), instantTime)
+        .create(partitionPath,
+            FSUtils.makeDataFileName(instantTime, "1-0-1", UUID.randomUUID().toString()),
+            IOType.MERGE);
     LOG.info("Created a dummy marker path=" + markerFilePath);
 
     Exception e = assertThrows(HoodieCommitException.class, () -> {
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
similarity index 95%
rename from hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
rename to hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
index 127cfac..7b2114f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
@@ -51,7 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
+public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
 
   private Configuration hadoopConf;
   private HoodieTableMetaClient metaClient;
@@ -78,8 +78,8 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
             .withParallelism(2, 2).forTable("test-trip-table").build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
-    boolean result = archiveLog.archiveIfRequired(hadoopConf);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
+    boolean result = archiveLog.archiveIfRequired();
     assertTrue(result);
   }
 
@@ -156,9 +156,8 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
     verifyInflightInstants(metaClient, 2);
 
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
-
-    assertTrue(archiveLog.archiveIfRequired(hadoopConf));
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
+    assertTrue(archiveLog.archiveIfRequired());
 
     // reload the timeline and remove the remaining commits
     timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
@@ -215,7 +214,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
         .build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
     // Requested Compaction
     HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
         new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "100"), dfs.getConf());
@@ -247,7 +246,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
 
     HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
     assertEquals(4, timeline.countInstants(), "Loaded 4 commits and the count should match");
-    boolean result = archiveLog.archiveIfRequired(hadoopConf);
+    boolean result = archiveLog.archiveIfRequired();
     assertTrue(result);
     timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
     assertEquals(4, timeline.countInstants(), "Should not archive commits when maxCommitsToKeep is 5");
@@ -280,7 +279,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
         .build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
     HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
     HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
     HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
@@ -290,7 +289,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
 
     HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
     assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
-    boolean result = archiveLog.archiveIfRequired(hadoopConf);
+    boolean result = archiveLog.archiveIfRequired();
     assertTrue(result);
     timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
     assertTrue(timeline.containsOrBeforeTimelineStarts("100"), "Archived commits should always be safe");
@@ -305,8 +304,6 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
         .build();
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
     HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
     HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
     HoodieTestDataGenerator.createSavepointFile(basePath, "101", dfs.getConf());
@@ -314,11 +311,11 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
     HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf());
     HoodieTestDataGenerator.createCommitFile(basePath, "104", dfs.getConf());
     HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
 
     HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
     assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
-    boolean result = archiveLog.archiveIfRequired(hadoopConf);
-    assertTrue(result);
+    assertTrue(archiveLog.archiveIfRequired());
     timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
     assertEquals(5, timeline.countInstants(),
         "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)");
@@ -336,8 +333,6 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
         .build();
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
     HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
     HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "101", dfs.getConf());
     HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
@@ -350,10 +345,11 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
     HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());
     HoodieTestDataGenerator.createCommitFile(basePath, "106", dfs.getConf());
     HoodieTestDataGenerator.createCommitFile(basePath, "107", dfs.getConf());
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, dfs.getConf());
 
     HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline();
     assertEquals(8, timeline.countInstants(), "Loaded 6 commits and the count should match");
-    boolean result = archiveLog.archiveIfRequired(hadoopConf);
+    boolean result = archiveLog.archiveIfRequired();
     assertTrue(result);
     timeline = metaClient.getActiveTimeline().reload().getCommitsAndCompactionTimeline();
     assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100")),
@@ -378,34 +374,35 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
   }
 
   @Test
-  public void checkArchiveCommitTimeline() throws IOException {
+  public void testArchiveCommitTimeline() throws IOException {
     HoodieWriteConfig cfg =
             HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
                     .withParallelism(2, 2).forTable("test-trip-table")
                     .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
                     .build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
 
     HoodieTestDataGenerator.createCommitFile(basePath, "1", dfs.getConf());
     HoodieInstant instant1 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1");
     HoodieTestDataGenerator.createCommitFile(basePath, "2", dfs.getConf());
+    Path markerPath = new Path(metaClient.getMarkerFolderPath("2"));
+    dfs.mkdirs(markerPath);
     HoodieInstant instant2 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "2");
     HoodieTestDataGenerator.createCommitFile(basePath, "3", dfs.getConf());
     HoodieInstant instant3 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "3");
 
     //add 2 more instants to pass filter criteria set in compaction config above
     HoodieTestDataGenerator.createCommitFile(basePath, "4", dfs.getConf());
-    HoodieInstant instant4 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "4");
     HoodieTestDataGenerator.createCommitFile(basePath, "5", dfs.getConf());
-    HoodieInstant instant5 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "5");
 
-    boolean result = archiveLog.archiveIfRequired(hadoopConf);
-    assertTrue(result);
 
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, dfs.getConf());
+    boolean result = archiveLog.archiveIfRequired();
+    assertTrue(result);
     HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
     List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3);
     assertEquals(new HashSet<>(archivedInstants), archivedTimeline.getInstants().collect(Collectors.toSet()));
+    assertFalse(dfs.exists(markerPath));
   }
 
   private void verifyInflightInstants(HoodieTableMetaClient metaClient, int expectedTotalInstants) {
@@ -425,7 +422,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
         .build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
 
     org.apache.hudi.avro.model.HoodieCommitMetadata expectedCommitMetadata = archiveLog.convertCommitMetadata(hoodieCommitMetadata);
     assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString());
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 3c32bbc..dc36005 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -43,6 +43,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator;
 import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.testutils.FileSystemTestUtils;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -54,10 +55,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
 import org.apache.hudi.testutils.HoodieTestDataGenerator;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -1128,7 +1129,7 @@ public class TestCleaner extends HoodieClientTestBase {
   private List<String> createMarkerFiles(String instantTime, int numFiles) throws IOException {
     List<String> files = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
-      files.add(HoodieTestUtils.createNewMarkerFile(basePath, "2019/03/29", instantTime));
+      files.add(HoodieClientTestUtils.createNewMarkerFile(basePath, "2019/03/29", instantTime));
     }
     return files;
   }
@@ -1140,13 +1141,8 @@ public class TestCleaner extends HoodieClientTestBase {
    * @throws IOException in case of error
    */
   private int getTotalTempFiles() throws IOException {
-    RemoteIterator<?> itr = fs.listFiles(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME), true);
-    int count = 0;
-    while (itr.hasNext()) {
-      count++;
-      itr.next();
-    }
-    return count;
+    return FileSystemTestUtils.listRecursive(fs, new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME))
+        .size();
   }
 
   private Stream<Pair<String, String>> convertPathToFileIdWithCommitTime(final HoodieTableMetaClient metaClient,
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 3bc1f5a..1551df1 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -387,15 +387,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testCOWToMORConvertedTableRollback(HoodieFileFormat baseFileFormat) throws Exception {
+  private void testCOWToMORConvertedTableRollback(HoodieFileFormat baseFileFormat, Boolean rollbackUsingMarkers) throws Exception {
     init(baseFileFormat);
-
     // Set TableType to COW
     HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE, baseFileFormat);
 
-    HoodieWriteConfig cfg = getConfig(true);
+    HoodieWriteConfig cfg = getConfig(false, rollbackUsingMarkers);
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       /**
@@ -410,6 +407,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
       // verify there are no errors
       assertNoWriteErrors(statuses);
+      client.commit(newCommitTime, jsc.parallelize(statuses));
 
       metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
       Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
@@ -446,10 +444,20 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
   @ParameterizedTest
   @MethodSource("argumentsProvider")
-  public void testRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat) throws Exception {
+  public void testCOWToMORConvertedTableRollbackUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
+    testCOWToMORConvertedTableRollback(baseFileFormat, false);
+  }
+
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testCOWToMORConvertedTableRollbackUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
+    testCOWToMORConvertedTableRollback(baseFileFormat, true);
+  }
+
+  private void testRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat, Boolean rollbackUsingMarkers) throws Exception {
     init(baseFileFormat);
 
-    HoodieWriteConfig cfg = getConfig(false);
+    HoodieWriteConfig cfg = getConfig(false, rollbackUsingMarkers);
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
 
       // Test delta commit rollback
@@ -538,7 +546,6 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
         writeRecords = jsc.parallelize(copyOfRecords, 1);
         writeStatusJavaRDD = thirdClient.upsert(writeRecords, commitTime2);
-        thirdClient.commit(commitTime2, writeStatusJavaRDD);
         statuses = writeStatusJavaRDD.collect();
         // Verify there are no errors
         assertNoWriteErrors(statuses);
@@ -574,20 +581,17 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
         metaClient = HoodieTableMetaClient.reload(metaClient);
 
         String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString();
-        JavaRDD<WriteStatus> ws = thirdClient.compact(compactionInstantTime);
-        thirdClient.commitCompaction(compactionInstantTime, ws, Option.empty());
+        thirdClient.compact(compactionInstantTime);
 
         allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
         metaClient = HoodieTableMetaClient.reload(metaClient);
         tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
 
-        final String compactedCommitTime =
-            metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
-
-        assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
-
-        thirdClient.rollback(compactedCommitTime);
-
+        final String compactedCommitTime = metaClient.getActiveTimeline().reload().lastInstant().get().getTimestamp();
+        assertTrue(Arrays.stream(listAllDataFilesInPath(hoodieTable, cfg.getBasePath()))
+                .anyMatch(file -> compactedCommitTime.equals(new HoodieBaseFile(file).getCommitTime())));
+        thirdClient.rollbackInflightCompaction(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactedCommitTime),
+                hoodieTable);
         allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
         metaClient = HoodieTableMetaClient.reload(metaClient);
         tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
@@ -599,6 +603,18 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
   @ParameterizedTest
   @MethodSource("argumentsProvider")
+  public void testRollbackWithDeltaAndCompactionCommitUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
+    testRollbackWithDeltaAndCompactionCommit(baseFileFormat, false);
+  }
+
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testRollbackWithDeltaAndCompactionCommitUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
+    testRollbackWithDeltaAndCompactionCommit(baseFileFormat, true);
+  }
+
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
   public void testMultiRollbackWithDeltaAndCompactionCommit(HoodieFileFormat baseFileFormat) throws Exception {
     init(baseFileFormat);
 
@@ -960,15 +976,14 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     }
   }
 
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testInsertsGeneratedIntoLogFilesRollback(HoodieFileFormat baseFileFormat) throws Exception {
+  private void testInsertsGeneratedIntoLogFilesRollback(HoodieFileFormat baseFileFormat,
+                                                        Boolean rollbackUsingMarkers) throws Exception {
     init(baseFileFormat);
 
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
-    HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
-    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
+    HoodieWriteConfig config = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY).build();
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config)) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
 
@@ -987,14 +1002,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       // rollback a failed commit
       boolean rollback = writeClient.rollback(newCommitTime);
       assertTrue(rollback);
-      newCommitTime = "101";
-      writeClient.startCommitWithTime(newCommitTime);
 
       // insert 100 records
+      newCommitTime = "101";
+      writeClient.startCommitWithTime(newCommitTime);
       records = dataGen.generateInserts(newCommitTime, 100);
       recordsRDD = jsc.parallelize(records, 1);
-      statuses = writeClient.insert(recordsRDD, newCommitTime);
-      writeClient.commit(newCommitTime, statuses);
+      writeClient.insert(recordsRDD, newCommitTime).collect();
 
       // Sleep for small interval (at least 1 second) to force a new rollback start time.
       Thread.sleep(1000);
@@ -1003,19 +1017,27 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       // and calling rollback twice
       final String lastCommitTime = newCommitTime;
       metaClient = getHoodieMetaClient(hadoopConf, basePath);
-      HoodieInstant last = metaClient.getCommitsTimeline().getInstants()
-          .filter(instant -> instant.getTimestamp().equals(lastCommitTime)).findFirst().get();
-      String fileName = last.getFileName();
+
       // Save the .commit file to local directory.
       // Rollback will be called twice to test the case where rollback failed first time and retried.
       // We got the "BaseCommitTime cannot be null" exception before the fix
-      File file = Files.createTempFile(tempFolder, null, null).toFile();
-      metaClient.getFs().copyToLocalFile(new Path(metaClient.getMetaPath(), fileName),
-          new Path(file.getAbsolutePath()));
-      writeClient.rollback(newCommitTime);
+      Map<String, String> fileNameMap = new HashMap<>();
+      for (State state : Arrays.asList(State.REQUESTED, State.INFLIGHT)) {
+        HoodieInstant toCopy = new HoodieInstant(state, HoodieTimeline.DELTA_COMMIT_ACTION, lastCommitTime);
+        File file = Files.createTempFile(tempFolder, null, null).toFile();
+        metaClient.getFs().copyToLocalFile(new Path(metaClient.getMetaPath(), toCopy.getFileName()),
+                new Path(file.getAbsolutePath()));
+        fileNameMap.put(file.getAbsolutePath(), toCopy.getFileName());
+      }
+      Path markerDir = new Path(Files.createTempDirectory(tempFolder,null).toAbsolutePath().toString());
+      if (rollbackUsingMarkers) {
+        metaClient.getFs().copyToLocalFile(new Path(metaClient.getMarkerFolderPath(lastCommitTime)),
+            markerDir);
+      }
 
+      writeClient.rollback(newCommitTime);
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
+      HoodieTable table = HoodieTable.create(config, hadoopConf);
       SliceView tableRTFileSystemView = table.getSliceView();
 
       long numLogFiles = 0;
@@ -1026,22 +1048,43 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
             .filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count();
       }
       assertEquals(0, numLogFiles);
-      metaClient.getFs().copyFromLocalFile(new Path(file.getAbsolutePath()),
-          new Path(metaClient.getMetaPath(), fileName));
+      fileNameMap.forEach((key, value) -> {
+        try {
+          metaClient.getFs().copyFromLocalFile(new Path(key),
+                  new Path(metaClient.getMetaPath(), value));
+        } catch (IOException e) {
+          throw new HoodieIOException("Error copying state from local disk.", e);
+        }
+      });
+      if (rollbackUsingMarkers) {
+        metaClient.getFs().copyFromLocalFile(markerDir,
+            new Path(metaClient.getMarkerFolderPath(lastCommitTime)));
+      }
       Thread.sleep(1000);
-      // Rollback again to pretend the first rollback failed partially. This should not error our
+      // Rollback again to pretend the first rollback failed partially. This should not error out
       writeClient.rollback(newCommitTime);
     }
   }
 
   @ParameterizedTest
   @MethodSource("argumentsProvider")
-  public void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(HoodieFileFormat baseFileFormat) throws Exception {
+  public void testInsertsGeneratedIntoLogFilesRollbackUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
+    testInsertsGeneratedIntoLogFilesRollback(baseFileFormat, false);
+  }
+
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testInsertsGeneratedIntoLogFilesRollbackUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
+    testInsertsGeneratedIntoLogFilesRollback(baseFileFormat, true);
+  }
+
+  private void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(HoodieFileFormat baseFileFormat,
+                                                                       Boolean rollbackUsingMarkers) throws Exception {
     init(baseFileFormat);
 
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
-    HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
+    HoodieWriteConfig config = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY).build();
     try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
@@ -1053,8 +1096,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       // trigger an action
       statuses.collect();
 
-      HoodieTable table =
-          HoodieTable.create(getHoodieMetaClient(hadoopConf, basePath), config, hadoopConf);
+      HoodieTable table = HoodieTable.create(getHoodieMetaClient(hadoopConf, basePath), config, hadoopConf);
       SliceView tableRTFileSystemView = table.getSliceView();
 
       long numLogFiles = 0;
@@ -1072,30 +1114,43 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       // Ensure all log files have been compacted into parquet files
       assertEquals(statuses.map(status -> status.getStat().getPath().contains("parquet")).count(), numLogFiles);
       assertEquals(statuses.count(), numLogFiles);
-      writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
+      //writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
       // Trigger a rollback of compaction
-      writeClient.rollback(newCommitTime);
+      table.getActiveTimeline().reload();
+      writeClient.rollbackInflightCompaction(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, newCommitTime), table);
+
       table = HoodieTable.create(getHoodieMetaClient(hadoopConf, basePath), config, hadoopConf);
       tableRTFileSystemView = table.getSliceView();
       ((SyncableFileSystemView) tableRTFileSystemView).reset();
-      Option<HoodieInstant> lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant();
-      System.out.println("Last Instant =" + lastInstant);
+
       for (String partitionPath : dataGen.getPartitionPaths()) {
-        assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
-        assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
+        List<FileSlice> fileSlices =  getFileSystemViewWithUnCommittedSlices(getHoodieMetaClient(hadoopConf, basePath))
+                .getAllFileSlices(partitionPath).filter(fs -> fs.getBaseInstantTime().equals("100")).collect(Collectors.toList());
+        assertTrue(fileSlices.stream().noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
+        assertTrue(fileSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
       }
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
+    testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(baseFileFormat, false);
+  }
+
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testInsertsGeneratedIntoLogFilesRollbackAfterCompactionUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
+    testInsertsGeneratedIntoLogFilesRollbackAfterCompaction(baseFileFormat, true);
+  }
+
   /**
    * Test to ensure metadata stats are correctly written to metadata file.
    */
-  @ParameterizedTest
-  @MethodSource("argumentsProvider")
-  public void testMetadataStatsOnCommit(HoodieFileFormat baseFileFormat) throws Exception {
+  public void testMetadataStatsOnCommit(HoodieFileFormat baseFileFormat, Boolean rollbackUsingMarkers) throws Exception {
     init(baseFileFormat);
-
-    HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
+    HoodieWriteConfig cfg = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY)
+        .withAutoCommit(false).build();
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
       metaClient = getHoodieMetaClient(hadoopConf, basePath);
       HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
@@ -1136,24 +1191,16 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       records = dataGen.generateUpdates(instantTime, records);
       writeRecords = jsc.parallelize(records, 1);
       statuses = client.upsert(writeRecords, instantTime);
-      assertTrue(client.commit(instantTime, statuses), "Commit should succeed");
-
-      // Read from commit file
-      table = HoodieTable.create(cfg, hadoopConf);
-      metadata = HoodieCommitMetadata.fromBytes(
-          table.getActiveTimeline()
-              .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
-          HoodieCommitMetadata.class);
-      
+      //assertTrue(client.commit(instantTime, statuses), "Commit should succeed");
       inserts = 0;
       int upserts = 0;
-      for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
-        for (HoodieWriteStat stat : pstat.getValue()) {
-          inserts += stat.getNumInserts();
-          upserts += stat.getNumUpdateWrites();
-        }
+      List<WriteStatus> writeStatusList = statuses.collect();
+      for (WriteStatus ws: writeStatusList) {
+        inserts += ws.getStat().getNumInserts();
+        upserts += ws.getStat().getNumUpdateWrites();
       }
 
+      // Read from commit file
       assertEquals(0, inserts);
       assertEquals(200, upserts);
 
@@ -1179,7 +1226,22 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
   }
 
   /**
-   * Test to ensure metadata stats are correctly written to the metadata file, identifies small files and corrects them.
+   * Test to ensure rolling stats are correctly written to metadata file.
+   */
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testMetadataStatsOnCommitUsingFileList(HoodieFileFormat baseFileFormat) throws Exception {
+    testMetadataStatsOnCommit(baseFileFormat, false);
+  }
+
+  @ParameterizedTest
+  @MethodSource("argumentsProvider")
+  public void testMetadataStatsOnCommitUsingMarkers(HoodieFileFormat baseFileFormat) throws Exception {
+    testMetadataStatsOnCommit(baseFileFormat, true);
+  }
+
+  /**
+   * Test to ensure rolling stats are correctly written to the metadata file, identifies small files and corrects them.
    */
   @ParameterizedTest
   @MethodSource("argumentsProvider")
@@ -1385,11 +1447,19 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     return getConfigBuilder(autoCommit).build();
   }
 
+  private HoodieWriteConfig getConfig(Boolean autoCommit, Boolean rollbackUsingMarkers) {
+    return getConfigBuilder(autoCommit, rollbackUsingMarkers, IndexType.BLOOM).build();
+  }
+
   protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
     return getConfigBuilder(autoCommit, IndexType.BLOOM);
   }
 
   protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, HoodieIndex.IndexType indexType) {
+    return getConfigBuilder(autoCommit, false, indexType);
+  }
+
+  protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) {
     return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
         .withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
@@ -1398,7 +1468,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
         .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
         .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
             .withEnableBackupForRemoteFileSystemView(false).build())
-        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build());
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
+        .withRollbackUsingMarkers(rollbackUsingMarkers);
   }
 
   private FileStatus[] insertAndGetFilePaths(List<HoodieRecord> records, HoodieWriteClient client,
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java
new file mode 100644
index 0000000..723d9e1
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.testutils.FileSystemTestUtils;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.IOType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestMarkerFiles extends HoodieCommonTestHarness {
+
+  private MarkerFiles markerFiles;
+  private FileSystem fs;
+  private Path markerFolderPath;
+
+  @BeforeEach
+  public void setup() throws IOException {
+    initPath();
+    initMetaClient();
+    this.fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf());
+    this.markerFolderPath =  new Path(metaClient.getMarkerFolderPath("000"));
+    this.markerFiles = new MarkerFiles(fs, metaClient.getBasePath(), markerFolderPath.toString(), "000");
+  }
+
+  private void createSomeMarkerFiles() {
+    markerFiles.create("2020/06/01", "file1", IOType.MERGE);
+    markerFiles.create("2020/06/02", "file2", IOType.APPEND);
+    markerFiles.create("2020/06/03", "file3", IOType.CREATE);
+  }
+
+  private void createInvalidFile(String partitionPath, String invalidFileName) {
+    Path path = FSUtils.getPartitionPath(markerFolderPath.toString(), partitionPath);
+    Path invalidFilePath = new Path(path, invalidFileName);
+    try {
+      fs.create(invalidFilePath, false).close();
+    } catch (IOException e) {
+      throw new HoodieException("Failed to create invalid file " + invalidFilePath, e);
+    }
+  }
+
+  @Test
+  public void testCreation() throws Exception {
+    // when
+    createSomeMarkerFiles();
+
+    // then
+    assertTrue(fs.exists(markerFolderPath));
+    List<FileStatus> markerFiles = FileSystemTestUtils.listRecursive(fs, markerFolderPath)
+        .stream().filter(status -> status.getPath().getName().contains(".marker"))
+        .sorted().collect(Collectors.toList());
+    assertEquals(3, markerFiles.size());
+    assertIterableEquals(CollectionUtils.createImmutableList(
+        "file:" + markerFolderPath.toString() + "/2020/06/01/file1.marker.MERGE",
+        "file:" + markerFolderPath.toString() + "/2020/06/02/file2.marker.APPEND",
+        "file:" + markerFolderPath.toString() + "/2020/06/03/file3.marker.CREATE"),
+        markerFiles.stream().map(m -> m.getPath().toString()).collect(Collectors.toList())
+    );
+  }
+
+  @Test
+  public void testDeletionWhenMarkerDirExists() throws IOException {
+    //when
+    markerFiles.create("2020/06/01", "file1", IOType.MERGE);
+
+    // then
+    assertTrue(markerFiles.doesMarkerDirExist());
+    assertTrue(markerFiles.deleteMarkerDir());
+    assertFalse(markerFiles.doesMarkerDirExist());
+  }
+
+  @Test
+  public void testDeletionWhenMarkerDirNotExists() throws IOException {
+    // then
+    assertFalse(markerFiles.doesMarkerDirExist());
+    assertFalse(markerFiles.deleteMarkerDir());
+  }
+
+  @Test
+  public void testDataPathsWhenCreatingOrMerging() throws IOException {
+    // add markfiles
+    createSomeMarkerFiles();
+    // add invalid file
+    createInvalidFile("2020/06/01", "invalid_file3");
+    int fileSize = FileSystemTestUtils.listRecursive(fs, markerFolderPath).size();
+    assertEquals(fileSize,4);
+
+    // then
+    assertIterableEquals(CollectionUtils.createImmutableList(
+        "2020/06/01/file1", "2020/06/03/file3"),
+        markerFiles.createdAndMergedDataPaths().stream().sorted().collect(Collectors.toList())
+    );
+  }
+
+  @Test
+  public void testAllMarkerPaths() throws IOException {
+    // given
+    createSomeMarkerFiles();
+
+    // then
+    assertIterableEquals(CollectionUtils.createImmutableList("2020/06/01/file1.marker.MERGE",
+        "2020/06/02/file2.marker.APPEND", "2020/06/03/file3.marker.CREATE"),
+        markerFiles.allMarkerFilePaths().stream().sorted().collect(Collectors.toList())
+    );
+  }
+
+  @Test
+  public void testStripMarkerSuffix() {
+    // Given
+    final String pathPrefix = "file://" + metaClient.getMetaPath() + "/file";
+    final String markerFilePath = pathPrefix + ".marker.APPEND";
+
+    // when-then
+    assertEquals(pathPrefix, MarkerFiles.stripMarkerSuffix(markerFilePath));
+  }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index 91b3bb0..09be8f1 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -68,7 +68,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
             .insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())
         .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
 
-    HoodieClientTestUtils.fakeCommitFile(basePath, "001");
+    HoodieClientTestUtils.fakeCommit(basePath, "001");
     HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
new file mode 100644
index 0000000..24aa869
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.Assertions;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class HoodieClientRollbackTestBase extends HoodieClientTestBase {
+  protected void twoUpsertCommitDataWithTwoPartitions(List<FileSlice> firstPartitionCommit2FileSlices,
+                                                      List<FileSlice> secondPartitionCommit2FileSlices,
+                                                      HoodieWriteConfig cfg,
+                                                      boolean commitSecondUpsert) throws IOException {
+    //just generate two partitions
+    dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
+    //1. prepare data
+    HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
+    HoodieWriteClient client = getHoodieWriteClient(cfg);
+    /**
+     * Write 1 (only inserts)
+     */
+    String newCommitTime = "001";
+    client.startCommitWithTime(newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
+    Assertions.assertNoWriteErrors(statuses.collect());
+    client.commit(newCommitTime, statuses);
+
+    /**
+     * Write 2 (updates)
+     */
+    newCommitTime = "002";
+    client.startCommitWithTime(newCommitTime);
+    records = dataGen.generateUpdates(newCommitTime, records);
+    statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime);
+    Assertions.assertNoWriteErrors(statuses.collect());
+    if (commitSecondUpsert) {
+      client.commit(newCommitTime, statuses);
+    }
+
+
+    //2. assert filegroup and get the first partition fileslice
+    HoodieTable table = this.getHoodieTable(metaClient, cfg);
+    SyncableFileSystemView fsView = getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
+    List<HoodieFileGroup> firstPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
+    assertEquals(1, firstPartitionCommit2FileGroups.size());
+    firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
+    //3. assert filegroup and get the second partition fileslice
+    List<HoodieFileGroup> secondPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
+    assertEquals(1, secondPartitionCommit2FileGroups.size());
+    secondPartitionCommit2FileSlices.addAll(secondPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
+
+    //4. assert fileslice
+    HoodieTableType tableType = this.getTableType();
+    if (tableType.equals(HoodieTableType.COPY_ON_WRITE)) {
+      assertEquals(2, firstPartitionCommit2FileSlices.size());
+      assertEquals(2, secondPartitionCommit2FileSlices.size());
+    } else {
+      assertEquals(1, firstPartitionCommit2FileSlices.size());
+      assertEquals(1, secondPartitionCommit2FileSlices.size());
+    }
+  }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
new file mode 100644
index 0000000..1c5a85a
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackTestBase {
+  @BeforeEach
+  public void setUp() throws Exception {
+    initPath();
+    initSparkContexts();
+    initFileSystem();
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  @Test
+  public void testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile() throws IOException {
+    // Let's create some commit files and parquet files
+    String commitTime1 = "001";
+    String commitTime2 = "002";
+    new File(basePath + "/.hoodie").mkdirs();
+    HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{"2015/03/16", "2015/03/17", "2016/03/15"},
+        basePath);
+    HoodieTestUtils.createCommitFiles(basePath, commitTime1, commitTime2);
+
+    // Make commit1
+    String file11 = HoodieTestUtils.createDataFile(basePath, "2015/03/16", commitTime1, "id11");
+    HoodieTestUtils.createNewLogFile(fs, basePath, "2015/03/16",
+        commitTime1, "id11", Option.of(3));
+    String file12 = HoodieTestUtils.createDataFile(basePath, "2015/03/17", commitTime1, "id12");
+
+    // Make commit2
+    String file21 = HoodieTestUtils.createDataFile(basePath, "2015/03/16", commitTime2, "id21");
+    String file22 = HoodieTestUtils.createDataFile(basePath, "2015/03/17", commitTime2, "id22");
+    HoodieTable table = this.getHoodieTable(metaClient, getConfig());
+    HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002");
+
+    // execute CopyOnWriteRollbackActionExecutor with filelisting mode
+    CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(jsc, table.getConfig(), table, "003", needRollBackInstant, true);
+    assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
+    List<HoodieRollbackStat> hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback();
+
+    // assert hoodieRollbackStats
+    assertEquals(hoodieRollbackStats.size(), 3);
+    hoodieRollbackStats.forEach(stat -> {
+      if (stat.getPartitionPath().equals("2015/03/16")) {
+        assertEquals(1, stat.getSuccessDeleteFiles().size());
+        assertEquals(0, stat.getFailedDeleteFiles().size());
+        assertEquals(Collections.EMPTY_MAP, stat.getCommandBlocksCount());
+        assertEquals("file:" + HoodieTestUtils.getDataFilePath(basePath, "2015/03/16", commitTime2, file21),
+            stat.getSuccessDeleteFiles().get(0));
+      } else if (stat.getPartitionPath().equals("2015/03/17")) {
+        assertEquals(1, stat.getSuccessDeleteFiles().size());
+        assertEquals(0, stat.getFailedDeleteFiles().size());
+        assertEquals(Collections.EMPTY_MAP, stat.getCommandBlocksCount());
+        assertEquals("file:" + HoodieTestUtils.getDataFilePath(basePath, "2015/03/17", commitTime2, file22),
+            stat.getSuccessDeleteFiles().get(0));
+      } else if (stat.getPartitionPath().equals("2016/03/15")) {
+        assertEquals(0, stat.getSuccessDeleteFiles().size());
+        assertEquals(0, stat.getFailedDeleteFiles().size());
+        assertEquals(Collections.EMPTY_MAP, stat.getCommandBlocksCount());
+      }
+    });
+
+    assertTrue(HoodieTestUtils.doesCommitExist(basePath, "001"));
+    assertTrue(HoodieTestUtils.doesInflightExist(basePath, "001"));
+    assertFalse(HoodieTestUtils.doesCommitExist(basePath, "002"));
+    assertFalse(HoodieTestUtils.doesInflightExist(basePath, "002"));
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, "2015/03/16", commitTime1, file11)
+        && HoodieTestUtils.doesDataFileExist(basePath, "2015/03/17", commitTime1, file12));
+    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, "2015/03/16", commitTime2, file21)
+        || HoodieTestUtils.doesDataFileExist(basePath, "2015/03/17", commitTime2, file22));
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testCopyOnWriteRollbackActionExecutor(boolean isUsingMarkers) throws IOException {
+    //1. prepare data and assert data result
+    List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
+    List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
+    HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
+    this.twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
+    HoodieTable<?> table = this.getHoodieTable(metaClient, cfg);
+
+    //2. rollback
+    HoodieInstant commitInstant;
+    if (isUsingMarkers) {
+      commitInstant = table.getActiveTimeline().getCommitTimeline().filterInflights().lastInstant().get();
+    } else {
+      commitInstant = table.getCompletedCommitTimeline().lastInstant().get();
+    }
+
+    CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(jsc, cfg, table, "003", commitInstant, false);
+    if (!isUsingMarkers) {
+      assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
+    } else {
+      assertTrue(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
+    }
+    Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = copyOnWriteRollbackActionExecutor.execute().getPartitionMetadata();
+
+    //3. assert the rollback stat
+    assertEquals(2, rollbackMetadata.size());
+    for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry : rollbackMetadata.entrySet()) {
+      HoodieRollbackPartitionMetadata meta = entry.getValue();
+      assertTrue(meta.getFailedDeleteFiles() == null
+              || meta.getFailedDeleteFiles().size() == 0);
+      assertTrue(meta.getSuccessDeleteFiles() == null
+              || meta.getSuccessDeleteFiles().size() == 1);
+    }
+
+    //4. assert filegroup after rollback, and compare to the rollbackstat
+    // assert the first partition file group and file slice
+    List<HoodieFileGroup> firstPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
+    assertEquals(1, firstPartitionRollBack1FileGroups.size());
+    List<FileSlice> firstPartitionRollBack1FileSlices = firstPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
+    assertEquals(1, firstPartitionRollBack1FileSlices.size());
+
+    if (!isUsingMarkers) {
+      firstPartitionCommit2FileSlices.removeAll(firstPartitionRollBack1FileSlices);
+      assertEquals(1, firstPartitionCommit2FileSlices.size());
+      assertEquals(firstPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
+          rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().get(0));
+    } else {
+      assertEquals(firstPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
+          String.format("%s:%s/%s", this.fs.getScheme(), basePath, rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().get(0)));
+    }
+
+    // assert the second partition file group and file slice
+    List<HoodieFileGroup> secondPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
+    assertEquals(1, secondPartitionRollBack1FileGroups.size());
+    List<FileSlice> secondPartitionRollBack1FileSlices = secondPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
+    assertEquals(1, secondPartitionRollBack1FileSlices.size());
+
+    // assert the second partition rollback file is equals rollBack1SecondPartitionStat
+    if (!isUsingMarkers) {
+      secondPartitionCommit2FileSlices.removeAll(secondPartitionRollBack1FileSlices);
+      assertEquals(1, secondPartitionCommit2FileSlices.size());
+      assertEquals(secondPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
+          rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().get(0));
+    } else {
+      assertEquals(secondPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
+          String.format("%s:%s/%s", this.fs.getScheme(), basePath, rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().get(0)));
+    }
+
+    assertFalse(new MarkerFiles(table, commitInstant.getTimestamp()).doesMarkerDirExist());
+  }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
new file mode 100644
index 0000000..c6652ed
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.FileSystemTestUtils;
+import org.apache.hudi.io.IOType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initPath();
+    initSparkContexts();
+    initFileSystem();
+    initMetaClient();
+    initDFS();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private void givenCommit0(boolean isDeltaCommit) throws Exception {
+    HoodieClientTestUtils.fakeDataFile(basePath, "partA", "000", "f2");
+    if (isDeltaCommit) {
+      HoodieClientTestUtils.fakeDeltaCommit(basePath, "000");
+    } else {
+      HoodieClientTestUtils.fakeCommit(basePath, "000");
+    }
+  }
+
+  private void givenInflightCommit1(boolean isDeltaCommit) throws Exception {
+    HoodieClientTestUtils.fakeDataFile(basePath, "partB", "001", "f1");
+    HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f1", IOType.CREATE);
+
+    HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f3", IOType.CREATE);
+
+    if (isDeltaCommit) {
+      HoodieClientTestUtils.fakeLogFile(basePath, "partA", "001", "f2", 0);
+      HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", IOType.APPEND);
+      HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f4", IOType.APPEND);
+      HoodieClientTestUtils.fakeInflightDeltaCommit(basePath, "001");
+    } else {
+      HoodieClientTestUtils.fakeDataFile(basePath, "partA", "001", "f2");
+      HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", IOType.MERGE);
+      HoodieClientTestUtils.fakeInFlightCommit(basePath, "001");
+    }
+  }
+
+  @Test
+  public void testCopyOnWriteRollback() throws Exception {
+    // given: wrote some base files and corresponding markers
+    givenCommit0(false);
+    givenInflightCommit1(false);
+
+    // when
+    List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002")
+        .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"));
+
+    // then: ensure files are deleted correctly, non-existent files reported as failed deletes
+    assertEquals(2, stats.size());
+
+    List<FileStatus> partAFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partA"));
+    List<FileStatus> partBFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partB"));
+
+    assertEquals(0, partBFiles.size());
+    assertEquals(1, partAFiles.size());
+    assertEquals(2, stats.stream().mapToInt(r -> r.getSuccessDeleteFiles().size()).sum());
+    assertEquals(1, stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum());
+  }
+
+  @Test
+  public void testMergeOnReadRollback() throws Exception {
+    // given: wrote some base + log files and corresponding markers
+    givenCommit0(true);
+    givenInflightCommit1(true);
+
+    // when
+    List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002")
+        .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "001"));
+
+    // then: ensure files are deleted, rollback block is appended (even if append does not exist)
+    assertEquals(2, stats.size());
+    // will have the log file
+    List<FileStatus> partBFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partB"));
+    assertEquals(1, partBFiles.size());
+    assertTrue(partBFiles.get(0).getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()));
+    assertTrue(partBFiles.get(0).getLen() > 0);
+
+    List<FileStatus> partAFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partA"));
+    assertEquals(3, partAFiles.size());
+    assertEquals(2, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count());
+    assertEquals(1, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count());
+
+    // only partB/f1_001 will be deleted
+    assertEquals(1, stats.stream().mapToInt(r -> r.getSuccessDeleteFiles().size()).sum());
+    // partA/f3_001 is non existent
+    assertEquals(1, stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum());
+  }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
new file mode 100644
index 0000000..759e7f6
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackTestBase {
+  @Override
+  protected HoodieTableType getTableType() {
+    return HoodieTableType.MERGE_ON_READ;
+  }
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initPath();
+    initSparkContexts();
+    //just generate tow partitions
+    dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
+    initFileSystem();
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testMergeOnReadRollbackActionExecutor(boolean isUsingMarkers) throws IOException {
+    //1. prepare data and assert data result
+    List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
+    List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
+    HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
+    twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
+    List<HoodieLogFile> firstPartitionCommit2LogFiles = new ArrayList<>();
+    List<HoodieLogFile> secondPartitionCommit2LogFiles = new ArrayList<>();
+    firstPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile -> firstPartitionCommit2LogFiles.add(logFile));
+    assertEquals(1, firstPartitionCommit2LogFiles.size());
+    secondPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile -> secondPartitionCommit2LogFiles.add(logFile));
+    assertEquals(1, secondPartitionCommit2LogFiles.size());
+    HoodieTable table = this.getHoodieTable(metaClient, cfg);
+
+    //2. rollback
+    HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
+    MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
+        jsc,
+        cfg,
+        table,
+        "003",
+        rollBackInstant,
+        true);
+    // assert is filelist mode
+    if (!isUsingMarkers) {
+      assertFalse(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
+    } else {
+      assertTrue(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
+    }
+
+    //3. assert the rollback stat
+    Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
+    assertEquals(2, rollbackMetadata.size());
+
+    for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry : rollbackMetadata.entrySet()) {
+      HoodieRollbackPartitionMetadata meta = entry.getValue();
+      assertTrue(meta.getFailedDeleteFiles() == null || meta.getFailedDeleteFiles().size() == 0);
+      assertTrue(meta.getSuccessDeleteFiles() == null || meta.getSuccessDeleteFiles().size() == 0);
+    }
+
+    //4. assert filegroup after rollback, and compare to the rollbackstat
+    // assert the first partition data and log file size
+    List<HoodieFileGroup> firstPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
+    assertEquals(1, firstPartitionRollBack1FileGroups.size());
+    List<FileSlice> firstPartitionRollBack1FileSlices = firstPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
+    assertEquals(1, firstPartitionRollBack1FileSlices.size());
+    FileSlice firstPartitionRollBack1FileSlice = firstPartitionRollBack1FileSlices.get(0);
+    List<HoodieLogFile> firstPartitionRollBackLogFiles = firstPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList());
+    assertEquals(2, firstPartitionRollBackLogFiles.size());
+
+    firstPartitionRollBackLogFiles.removeAll(firstPartitionCommit2LogFiles);
+    assertEquals(1, firstPartitionRollBackLogFiles.size());
+
+    // assert the second partition data and log file size
+    List<HoodieFileGroup> secondPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
+    assertEquals(1, secondPartitionRollBack1FileGroups.size());
+    List<FileSlice> secondPartitionRollBack1FileSlices = secondPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
+    assertEquals(1, secondPartitionRollBack1FileSlices.size());
+    FileSlice secondPartitionRollBack1FileSlice = secondPartitionRollBack1FileSlices.get(0);
+    List<HoodieLogFile> secondPartitionRollBackLogFiles = secondPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList());
+    assertEquals(2, secondPartitionRollBackLogFiles.size());
+
+    secondPartitionRollBackLogFiles.removeAll(secondPartitionCommit2LogFiles);
+    assertEquals(1, secondPartitionRollBackLogFiles.size());
+
+    assertFalse(new MarkerFiles(table, "002").doesMarkerDirExist());
+  }
+
+  @Test
+  public void testFailForCompletedInstants() {
+    Assertions.assertThrows(IllegalArgumentException.class, () -> {
+      HoodieInstant rollBackInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
+      new MergeOnReadRollbackActionExecutor(
+              jsc,
+              getConfigBuilder().build(),
+              getHoodieTable(metaClient, getConfigBuilder().build()),
+              "003",
+              rollBackInstant,
+              true,
+              true,
+              true);
+    });
+  }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackUtils.java b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackUtils.java
new file mode 100644
index 0000000..8db2069
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackUtils.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+
+public class TestRollbackUtils {
+
+  private FileStatus generateFileStatus(String filePath) {
+    Path dataFile1Path = new Path(filePath);
+    return new FileStatus(1, true, 1, 1, 1, 1,
+        FsPermission.valueOf("-rw-rw-rw-"), "one", "one", null, dataFile1Path);
+  }
+
+  @Test
+  public void testGenerateHeader() {
+    HoodieInstant hoodieInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101");
+    String instantToRollback = "1";
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = RollbackUtils.generateHeader(instantToRollback, hoodieInstant.getTimestamp());
+    Map<HoodieLogBlock.HeaderMetadataType, String> headerExpect = new HashMap<>(3);
+    headerExpect.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
+    headerExpect.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "1");
+    headerExpect.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, "0");
+    assertEquals(header, headerExpect);
+  }
+
+  @Test
+  public void testMergeRollbackStat() {
+    String partitionPath1 = "/partitionPath1/";
+    String partitionPath2 = "/partitionPath2/";
+    //prepare HoodieRollbackStat for different partition
+    Map<FileStatus, Boolean> dataFilesOnlyStat1Files = new HashMap<>();
+    dataFilesOnlyStat1Files.put(generateFileStatus(partitionPath1 + "dataFile1.parquet"), true);
+    dataFilesOnlyStat1Files.put(generateFileStatus(partitionPath1 + "dataFile2.parquet"), true);
+    HoodieRollbackStat dataFilesOnlyStat1 = HoodieRollbackStat.newBuilder()
+        .withPartitionPath(partitionPath1)
+        .withDeletedFileResults(dataFilesOnlyStat1Files).build();
+
+    Map<FileStatus, Boolean> dataFilesOnlyStat2Files = new HashMap<>();
+    dataFilesOnlyStat2Files.put(generateFileStatus(partitionPath2 + "dataFile1.parquet"), true);
+    dataFilesOnlyStat2Files.put(generateFileStatus(partitionPath2 + "dataFile2.parquet"), true);
+    HoodieRollbackStat dataFilesOnlyStat2 = HoodieRollbackStat.newBuilder()
+        .withPartitionPath(partitionPath2)
+        .withDeletedFileResults(dataFilesOnlyStat1Files).build();
+    //1. test different partitionpath merge
+    assertThrows(IllegalArgumentException.class, () -> {
+      RollbackUtils.mergeRollbackStat(dataFilesOnlyStat1,
+          dataFilesOnlyStat2);
+    }, "different partition rollbackstat merge will failed");
+
+    //prepare HoodieRollbackStat for failed and block append
+    Map<FileStatus, Boolean> dataFilesOnlyStat3Files = new HashMap<>();
+    dataFilesOnlyStat3Files.put(generateFileStatus(partitionPath1 + "dataFile1.log"), true);
+    dataFilesOnlyStat3Files.put(generateFileStatus(partitionPath1 + "dataFile3.parquet"), false);
+    HoodieRollbackStat dataFilesOnlyStat3 = HoodieRollbackStat.newBuilder()
+        .withPartitionPath(partitionPath1)
+        .withDeletedFileResults(dataFilesOnlyStat3Files).build();
+
+    Map<FileStatus, Long> dataFilesOnlyStat4Files = new HashMap<>();
+    dataFilesOnlyStat4Files.put(generateFileStatus(partitionPath1 + "dataFile1.log"), 10L);
+    HoodieRollbackStat dataFilesOnlyStat4 = HoodieRollbackStat.newBuilder()
+        .withPartitionPath(partitionPath1)
+        .withRollbackBlockAppendResults(dataFilesOnlyStat4Files).build();
+
+    //2. test merge dataFilesOnlyStat1 and dataFilesOnlyStat3
+    HoodieRollbackStat dataFilesOnlyStatMerge1 =
+        RollbackUtils.mergeRollbackStat(dataFilesOnlyStat1, dataFilesOnlyStat3);
+    assertEquals(partitionPath1, dataFilesOnlyStatMerge1.getPartitionPath());
+    assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile3.parquet"),
+        dataFilesOnlyStatMerge1.getFailedDeleteFiles());
+    assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile1.parquet",
+        partitionPath1 + "dataFile2.parquet", partitionPath1 + "dataFile1.log").stream().sorted().collect(Collectors.toList()),
+        dataFilesOnlyStatMerge1.getSuccessDeleteFiles().stream().sorted().collect(Collectors.toList()));
+    assertEquals(0, dataFilesOnlyStatMerge1.getCommandBlocksCount().size());
+
+    //3. test merge dataFilesOnlyStatMerge1 and dataFilesOnlyStat4
+    HoodieRollbackStat dataFilesOnlyStatMerge2 =
+        RollbackUtils.mergeRollbackStat(dataFilesOnlyStatMerge1, dataFilesOnlyStat4);
+    assertEquals(partitionPath1, dataFilesOnlyStatMerge1.getPartitionPath());
+    assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile3.parquet").stream().sorted().collect(Collectors.toList()),
+        dataFilesOnlyStatMerge2.getFailedDeleteFiles().stream().sorted().collect(Collectors.toList()));
+    assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile1.parquet",
+        partitionPath1 + "dataFile2.parquet", partitionPath1 + "dataFile1.log").stream().sorted().collect(Collectors.toList()),
+        dataFilesOnlyStatMerge2.getSuccessDeleteFiles().stream().sorted().collect(Collectors.toList()));
+    assertEquals(CollectionUtils.createImmutableMap(generateFileStatus(partitionPath1 + "dataFile1.log"), 10L),
+        dataFilesOnlyStatMerge2.getCommandBlocksCount());
+  }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index a3b534c..71be8f3 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.table.HoodieTable;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
@@ -71,7 +70,6 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
   protected transient HoodieWriteClient writeClient;
   protected transient HoodieReadClient readClient;
   protected transient HoodieTableFileSystemView tableView;
-  protected transient HoodieTable hoodieTable;
 
   protected final SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index 5c57c25..dddc04d 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.IOType;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.io.storage.HoodieParquetWriter;
 
@@ -80,6 +81,7 @@ public class HoodieClientTestUtils {
 
   private static final Logger LOG = LogManager.getLogger(HoodieClientTestUtils.class);
   private static final Random RANDOM = new Random();
+  public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
 
   public static List<WriteStatus> collectStatuses(Iterator<List<WriteStatus>> statusListItr) {
     List<WriteStatus> statuses = new ArrayList<>();
@@ -124,11 +126,19 @@ public class HoodieClientTestUtils {
     new File(parentPath + "/" + instantTime + suffix).createNewFile();
   }
 
-  public static void fakeCommitFile(String basePath, String instantTime) throws IOException {
+  public static void fakeCommit(String basePath, String instantTime) throws IOException {
     fakeMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION);
   }
 
-  public static void fakeInFlightFile(String basePath, String instantTime) throws IOException {
+  public static void fakeDeltaCommit(String basePath, String instantTime) throws IOException {
+    fakeMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION);
+  }
+
+  public static void fakeInflightDeltaCommit(String basePath, String instantTime) throws IOException {
+    fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
+  }
+
+  public static void fakeInFlightCommit(String basePath, String instantTime) throws IOException {
     fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_EXTENSION);
   }
 
@@ -146,6 +156,20 @@ public class HoodieClientTestUtils {
     new RandomAccessFile(path, "rw").setLength(length);
   }
 
+  public static void fakeLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version)
+          throws Exception {
+    fakeLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0);
+  }
+
+  public static void fakeLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version, int length)
+          throws Exception {
+    String parentPath = String.format("%s/%s", basePath, partitionPath);
+    new File(parentPath).mkdirs();
+    String path = String.format("%s/%s", parentPath, FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, "1-0-1"));
+    new File(path).createNewFile();
+    new RandomAccessFile(path, "rw").setLength(length);
+  }
+
   /**
    * Returns a Spark config for this test.
    *
@@ -308,4 +332,25 @@ public class HoodieClientTestUtils {
     return HoodieClientTestUtils.writeParquetFile(basePath, partitionPath, filename, records, schema, filter,
         createCommitTime);
   }
+
+  public static String createNewMarkerFile(String basePath, String partitionPath, String instantTime)
+      throws IOException {
+    return createMarkerFile(basePath, partitionPath, instantTime);
+  }
+
+  public static String createMarkerFile(String basePath, String partitionPath, String instantTime)
+          throws IOException {
+    return createMarkerFile(basePath, partitionPath, instantTime, UUID.randomUUID().toString(), IOType.MERGE);
+  }
+
+  public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileID, IOType ioType)
+          throws IOException {
+    String folderPath = basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME + "/" + instantTime + "/" + partitionPath + "/";
+    new File(folderPath).mkdirs();
+    String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, DEFAULT_WRITE_TOKEN, instantTime,
+        HoodieFileFormat.PARQUET.getFileExtension(), HoodieTableMetaClient.MARKER_EXTN, ioType);
+    File f = new File(folderPath + markerFileName);
+    f.createNewFile();
+    return f.getAbsolutePath();
+  }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
index 20bcafd..b88cfff 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
@@ -90,7 +90,7 @@ public class HoodieMergeOnReadTestUtils {
     }).reduce((a, b) -> {
       a.addAll(b);
       return a;
-    }).orElse(new ArrayList<GenericRecord>());
+    }).orElse(new ArrayList<>());
   }
 
   private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, Schema schema,
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java
index 0eb88e2..2b41310 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java
@@ -427,15 +427,33 @@ public class HoodieTestDataGenerator {
     return generateInsertsStream(commitTime, n, false, schemaStr);
   }
 
+  public List<HoodieRecord> generateInsertsContainsAllPartitions(String instantTime, Integer n) {
+    if (n < partitionPaths.length) {
+      throw new HoodieIOException("n must greater then partitionPaths length");
+    }
+    return generateInsertsStream(
+             instantTime,  n, false, TRIP_EXAMPLE_SCHEMA, true).collect(Collectors.toList());
+  }
+
   /**
    * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
    */
   public Stream<HoodieRecord> generateInsertsStream(
-      String instantTime, Integer n, boolean isFlattened, String schemaStr) {
-    int currSize = getNumExistingKeys(schemaStr);
+          String instantTime, Integer n, boolean isFlattened, String schemaStr) {
+    return generateInsertsStream(instantTime, n, isFlattened, schemaStr, false);
+  }
 
+  /**
+   * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
+   */
+  public Stream<HoodieRecord> generateInsertsStream(
+      String instantTime, Integer n, boolean isFlattened, String schemaStr, boolean containsAllPartitions) {
+    int currSize = getNumExistingKeys(schemaStr);
     return IntStream.range(0, n).boxed().map(i -> {
       String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)];
+      if (containsAllPartitions && i < partitionPaths.length) {
+        partitionPath = partitionPaths[i];
+      }
       HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
       KeyPartition kp = new KeyPartition();
       kp.key = key;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java
index 68ea531..a3191fa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common;
 import org.apache.hadoop.fs.FileStatus;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -85,6 +86,15 @@ public class HoodieRollbackStat implements Serializable {
       return this;
     }
 
+    public Builder withDeletedFileResult(String fileName, boolean isDeleted) {
+      if (isDeleted) {
+        successDeleteFiles = Collections.singletonList(fileName);
+      } else {
+        failedDeleteFiles = Collections.singletonList(fileName);
+      }
+      return this;
+    }
+
     public Builder withRollbackBlockAppendResults(Map<FileStatus, Long> commandBlocksCount) {
       this.commandBlocksCount = commandBlocksCount;
       return this;
@@ -96,6 +106,15 @@ public class HoodieRollbackStat implements Serializable {
     }
 
     public HoodieRollbackStat build() {
+      if (successDeleteFiles == null) {
+        successDeleteFiles = Collections.EMPTY_LIST;
+      }
+      if (failedDeleteFiles == null) {
+        failedDeleteFiles = Collections.EMPTY_LIST;
+      }
+      if (commandBlocksCount == null) {
+        commandBlocksCount = Collections.EMPTY_MAP;
+      }
       return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles, commandBlocksCount);
     }
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index fffd43f..6807f6f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -46,7 +45,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -116,22 +114,6 @@ public class FSUtils {
     return String.format("%s_%s_%s%s", fileId, writeToken, instantTime, fileExtension);
   }
 
-  public static String makeMarkerFile(String instantTime, String writeToken, String fileId) {
-    return String.format("%s_%s_%s%s", fileId, writeToken, instantTime, HoodieTableMetaClient.MARKER_EXTN);
-  }
-
-  public static String translateMarkerToDataPath(String basePath, String markerPath, String instantTs,
-                                                 String baseFileExtension) {
-    ValidationUtils.checkArgument(markerPath.endsWith(HoodieTableMetaClient.MARKER_EXTN));
-    String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
-        new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTs))).toString();
-    int begin = markerPath.indexOf(markerRootPath);
-    ValidationUtils.checkArgument(begin >= 0,
-        "Not in marker dir. Marker Path=" + markerPath + ", Expected Marker Root=" + markerRootPath);
-    String rPath = markerPath.substring(begin + markerRootPath.length() + 1);
-    return String.format("%s/%s%s", basePath, rPath.replace(HoodieTableMetaClient.MARKER_EXTN, ""), baseFileExtension);
-  }
-
   public static String maskWithoutFileId(String instantTime, int taskPartitionId) {
     return String.format("*_%s_%s%s", taskPartitionId, instantTime, HoodieFileFormat.PARQUET.getFileExtension());
   }
@@ -171,15 +153,15 @@ public class FSUtils {
   /**
    * Given a base partition and a partition path, return relative path of partition path to the base path.
    */
-  public static String getRelativePartitionPath(Path basePath, Path partitionPath) {
+  public static String getRelativePartitionPath(Path basePath, Path fullPartitionPath) {
     basePath = Path.getPathWithoutSchemeAndAuthority(basePath);
-    partitionPath = Path.getPathWithoutSchemeAndAuthority(partitionPath);
-    String partitionFullPath = partitionPath.toString();
-    int partitionStartIndex = partitionFullPath.indexOf(basePath.getName(),
+    fullPartitionPath = Path.getPathWithoutSchemeAndAuthority(fullPartitionPath);
+    String fullPartitionPathStr = fullPartitionPath.toString();
+    int partitionStartIndex = fullPartitionPathStr.indexOf(basePath.getName(),
         basePath.getParent() == null ? 0 : basePath.getParent().toString().length());
     // Partition-Path could be empty for non-partitioned tables
-    return partitionStartIndex + basePath.getName().length() == partitionFullPath.length() ? ""
-        : partitionFullPath.substring(partitionStartIndex + basePath.getName().length() + 1);
+    return partitionStartIndex + basePath.getName().length() == fullPartitionPathStr.length() ? ""
+        : fullPartitionPathStr.substring(partitionStartIndex + basePath.getName().length() + 1);
   }
 
   /**
@@ -199,19 +181,6 @@ public class FSUtils {
     return partitions;
   }
 
-  public static List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs,
-      String markerDir, String baseFileExtension) throws IOException {
-    List<String> dataFiles = new LinkedList<>();
-    processFiles(fs, markerDir, (status) -> {
-      String pathStr = status.getPath().toString();
-      if (pathStr.endsWith(HoodieTableMetaClient.MARKER_EXTN)) {
-        dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs, baseFileExtension));
-      }
-      return true;
-    }, false);
-    return dataFiles;
-  }
-
   /**
    * Recursively processes all files in the base-path. If excludeMetaFolder is set, the meta-folder and all its subdirs
    * are skipped
@@ -222,8 +191,8 @@ public class FSUtils {
    * @param excludeMetaFolder Exclude .hoodie folder
    * @throws IOException -
    */
-  static void processFiles(FileSystem fs, String basePathStr, Function<FileStatus, Boolean> consumer,
-      boolean excludeMetaFolder) throws IOException {
+  public static void processFiles(FileSystem fs, String basePathStr, Function<FileStatus, Boolean> consumer,
+                                  boolean excludeMetaFolder) throws IOException {
     PathFilter pathFilter = excludeMetaFolder ? getExcludeMetaPathFilter() : ALLOW_ALL_FILTER;
     FileStatus[] topLevelStatuses = fs.listStatus(new Path(basePathStr));
     for (FileStatus child : topLevelStatuses) {
@@ -390,7 +359,7 @@ public class FSUtils {
 
   public static boolean isLogFile(Path logPath) {
     Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
-    return matcher.find();
+    return matcher.find() && logPath.getName().contains(".log");
   }
 
   /**
@@ -501,18 +470,6 @@ public class FSUtils {
     }
   }
 
-  public static void deleteOlderRestoreMetaFiles(FileSystem fs, String metaPath, Stream<HoodieInstant> instants) {
-    // TODO - this should be archived when archival is made general for all meta-data
-    // skip MIN_ROLLBACK_TO_KEEP and delete rest
-    instants.skip(MIN_ROLLBACK_TO_KEEP).map(s -> {
-      try {
-        return fs.delete(new Path(metaPath, s.getFileName()), false);
-      } catch (IOException e) {
-        throw new HoodieIOException("Could not delete restore meta files " + s.getFileName(), e);
-      }
-    });
-  }
-
   public static void createPathIfNotExists(FileSystem fs, Path partitionPath) throws IOException {
     if (!fs.exists(partitionPath)) {
       fs.mkdirs(partitionPath);
@@ -535,8 +492,8 @@ public class FSUtils {
   /**
    * Get DFS full partition path (e.g. hdfs://ip-address:8020:/<absolute path>)
    */
-  public static String getDFSFullPartitionPath(FileSystem fs, Path partitionPath) {
-    return fs.getUri() + partitionPath.toUri().getRawPath();
+  public static String getDFSFullPartitionPath(FileSystem fs, Path fullPartitionPath) {
+    return fs.getUri() + fullPartitionPath.toUri().getRawPath();
   }
 
   /**
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java
index 8432b26..76fdf18 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java
@@ -18,6 +18,10 @@
 
 package org.apache.hudi.common.testutils;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hudi.common.fs.inline.InLineFSUtils;
 import org.apache.hudi.common.fs.inline.InLineFileSystem;
 import org.apache.hudi.common.fs.inline.InMemoryFileSystem;
@@ -26,6 +30,8 @@ import org.apache.hadoop.fs.Path;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 
@@ -67,4 +73,13 @@ public class FileSystemTestUtils {
       throw new IOException(message);
     }
   }
+
+  public static List<FileStatus> listRecursive(FileSystem fs, Path path) throws IOException {
+    RemoteIterator<LocatedFileStatus> itr = fs.listFiles(path, true);
+    List<FileStatus> statuses = new ArrayList<>();
+    while (itr.hasNext()) {
+      statuses.add(itr.next());
+    }
+    return statuses;
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index 8e1d080..31986b9 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 
+import org.apache.hudi.exception.HoodieIOException;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
@@ -77,6 +78,17 @@ public class HoodieCommonTestHarness {
     return getFileSystemView(timeline, true);
   }
 
+  protected SyncableFileSystemView getFileSystemViewWithUnCommittedSlices(HoodieTableMetaClient metaClient) {
+    try {
+      return new HoodieTableFileSystemView(metaClient,
+              metaClient.getActiveTimeline(),
+              HoodieTestUtils.listAllDataFilesAndLogFilesInPath(metaClient.getFs(), metaClient.getBasePath())
+      );
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error getting file system view", ioe);
+    }
+  }
+
   /**
    * Gets a default {@link HoodieTableType#COPY_ON_WRITE} table type. Sub-classes can override this method to specify a
    * new table type.
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index caae246..cbf3af6 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -18,6 +18,12 @@
 
 package org.apache.hudi.common.testutils;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieActionInstant;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -60,12 +66,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.util.StringUtils;
 
 import java.io.ByteArrayInputStream;
@@ -268,12 +268,6 @@ public class HoodieTestUtils {
     return createDataFileFixLength(basePath, partitionPath, instantTime, fileID, length);
   }
 
-  public static String createNewMarkerFile(String basePath, String partitionPath, String instantTime)
-      throws IOException {
-    String fileID = UUID.randomUUID().toString();
-    return createMarkerFile(basePath, partitionPath, instantTime, fileID);
-  }
-
   public static String createDataFile(String basePath, String partitionPath, String instantTime, String fileID)
       throws IOException {
     String folderPath = basePath + "/" + partitionPath + "/";
@@ -294,16 +288,6 @@ public class HoodieTestUtils {
     return fileID;
   }
 
-  public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileID)
-      throws IOException {
-    String folderPath =
-        basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME + "/" + instantTime + "/" + partitionPath + "/";
-    new File(folderPath).mkdirs();
-    File f = new File(folderPath + FSUtils.makeMarkerFile(instantTime, DEFAULT_WRITE_TOKEN, fileID));
-    f.createNewFile();
-    return f.getAbsolutePath();
-  }
-
   public static String createNewLogFile(FileSystem fs, String basePath, String partitionPath, String instantTime,
       String fileID, Option<Integer> version) throws IOException {
     String folderPath = basePath + "/" + partitionPath + "/";
@@ -465,7 +449,7 @@ public class HoodieTestUtils {
 
   // TODO: should be removed
   public static FileStatus[] listAllDataFilesInPath(FileSystem fs, String basePath) throws IOException {
-    return listAllDataFilesInPath(fs, basePath, ".parquet");
+    return listAllDataFilesInPath(fs, basePath, HoodieFileFormat.PARQUET.getFileExtension());
   }
 
   public static FileStatus[] listAllDataFilesInPath(FileSystem fs, String basePath, String datafileExtension)
@@ -474,26 +458,31 @@ public class HoodieTestUtils {
     List<FileStatus> returns = new ArrayList<>();
     while (itr.hasNext()) {
       LocatedFileStatus status = itr.next();
-      if (status.getPath().getName().contains(datafileExtension)) {
+      if (status.getPath().getName().contains(datafileExtension) && !status.getPath().getName().contains(".marker")) {
         returns.add(status);
       }
     }
     return returns.toArray(new FileStatus[returns.size()]);
   }
 
-  public static FileStatus[] listAllLogFilesInPath(FileSystem fs, String basePath, String logfileExtension)
+  public static FileStatus[] listAllLogFilesInPath(FileSystem fs, String basePath)
       throws IOException {
     RemoteIterator<LocatedFileStatus> itr = fs.listFiles(new Path(basePath), true);
     List<FileStatus> returns = new ArrayList<>();
     while (itr.hasNext()) {
       LocatedFileStatus status = itr.next();
-      if (status.getPath().getName().contains(logfileExtension)) {
+      if (status.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
         returns.add(status);
       }
     }
     return returns.toArray(new FileStatus[returns.size()]);
   }
 
+  public static FileStatus[] listAllDataFilesAndLogFilesInPath(FileSystem fs, String basePath) throws IOException {
+    return Stream.concat(Arrays.stream(listAllDataFilesInPath(fs, basePath)), Arrays.stream(listAllLogFilesInPath(fs, basePath)))
+            .toArray(FileStatus[]::new);
+  }
+
   public static List<String> monotonicIncreasingCommitTimestamps(int numTimestamps, int startSecsDelta) {
     Calendar cal = Calendar.getInstance();
     cal.add(Calendar.SECOND, startSecsDelta);
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java
index 6cadbc5..20c7811 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.utilities.functional;
 
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -35,9 +37,7 @@ import org.apache.hudi.utilities.HoodieSnapshotExporter.Partitioner;
 import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
 
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -92,7 +92,6 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness {
     JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
     hdfsWriteClient.bulkInsert(recordsRDD, COMMIT_TIME);
     hdfsWriteClient.close();
-
     RemoteIterator<LocatedFileStatus> itr = dfs().listFiles(new Path(sourcePath), true);
     while (itr.hasNext()) {
       LOG.info(">>> Prepared test file: " + itr.next().getPath());