You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/09/15 22:53:00 UTC

[hudi] branch master updated: [HUDI-2433] Refactor rollback actions in hudi-client module (#3664)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 916f12b  [HUDI-2433] Refactor rollback actions in hudi-client module (#3664)
916f12b is described below

commit 916f12b7dd4b635c790b17876fa49fb786233e94
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Wed Sep 15 15:52:43 2021 -0700

    [HUDI-2433] Refactor rollback actions in hudi-client module (#3664)
---
 ...java => CopyOnWriteRollbackActionExecutor.java} |  48 ++--
 .../rollback/ListingBasedRollbackHelper.java       | 249 ++++++++++++++++++++
 ...ategy.java => MarkerBasedRollbackStrategy.java} |  68 +++++-
 ...java => MergeOnReadRollbackActionExecutor.java} |  68 ++++--
 .../client/common/HoodieFlinkEngineContext.java    |  12 +
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |   4 +-
 .../hudi/table/HoodieFlinkMergeOnReadTable.java    |   4 +-
 .../FlinkCopyOnWriteRollbackActionExecutor.java    |  71 ------
 .../rollback/FlinkMarkerBasedRollbackStrategy.java |  90 --------
 .../FlinkMergeOnReadRollbackActionExecutor.java    |  77 -------
 .../rollback/ListingBasedRollbackHelper.java       | 250 --------------------
 .../table/upgrade/ZeroToOneUpgradeHandler.java     |   2 +-
 .../client/common/HoodieJavaEngineContext.java     |  10 +
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |   4 +-
 .../JavaCopyOnWriteRestoreActionExecutor.java      |   4 +-
 .../JavaCopyOnWriteRollbackActionExecutor.java     |  72 ------
 .../rollback/JavaListingBasedRollbackHelper.java   | 237 -------------------
 .../rollback/JavaMarkerBasedRollbackStrategy.java  |  78 -------
 .../client/common/HoodieSparkEngineContext.java    |   9 +
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |   4 +-
 .../hudi/table/HoodieSparkMergeOnReadTable.java    |   5 +-
 .../SparkCopyOnWriteRestoreActionExecutor.java     |   4 +-
 .../SparkMergeOnReadRestoreActionExecutor.java     |   4 +-
 .../rollback/ListingBasedRollbackHelper.java       | 252 ---------------------
 .../SparkCopyOnWriteRollbackActionExecutor.java    |  73 ------
 .../rollback/SparkMarkerBasedRollbackStrategy.java |  93 --------
 .../SparkMergeOnReadRollbackActionExecutor.java    |  82 -------
 .../TestCopyOnWriteRollbackActionExecutor.java     |  10 +-
 .../TestMergeOnReadRollbackActionExecutor.java     |  24 +-
 .../TestMarkerBasedRollbackStrategy.java           |   6 +-
 .../hudi/common/engine/HoodieEngineContext.java    |   4 +
 .../common/engine/HoodieLocalEngineContext.java    |  11 +
 .../hudi/common/function/FunctionWrapper.java      |  11 +
 .../common/function/SerializableBiFunction.java    |  34 +++
 34 files changed, 512 insertions(+), 1462 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
similarity index 60%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseCopyOnWriteRollbackActionExecutor.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
index fa74f7f..44b5492 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseCopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
@@ -33,31 +33,40 @@ import org.apache.log4j.Logger;
 import java.util.ArrayList;
 import java.util.List;
 
-public abstract class BaseCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseRollbackActionExecutor<T, I, K, O> {
+public class CopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseRollbackActionExecutor<T, I, K, O> {
 
-  private static final Logger LOG = LogManager.getLogger(BaseCopyOnWriteRollbackActionExecutor.class);
+  private static final Logger LOG = LogManager.getLogger(CopyOnWriteRollbackActionExecutor.class);
 
-  public BaseCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
-                                               HoodieWriteConfig config,
-                                               HoodieTable<T, I, K, O> table,
-                                               String instantTime,
-                                               HoodieInstant commitInstant,
-                                               boolean deleteInstants) {
+  public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
+                                           HoodieWriteConfig config,
+                                           HoodieTable<T, I, K, O> table,
+                                           String instantTime,
+                                           HoodieInstant commitInstant,
+                                           boolean deleteInstants) {
     super(context, config, table, instantTime, commitInstant, deleteInstants);
   }
 
-  public BaseCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
-                                               HoodieWriteConfig config,
-                                               HoodieTable<T, I, K, O> table,
-                                               String instantTime,
-                                               HoodieInstant commitInstant,
-                                               boolean deleteInstants,
-                                               boolean skipTimelinePublish,
-                                               boolean useMarkerBasedStrategy) {
+  public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
+                                           HoodieWriteConfig config,
+                                           HoodieTable<T, I, K, O> table,
+                                           String instantTime,
+                                           HoodieInstant commitInstant,
+                                           boolean deleteInstants,
+                                           boolean skipTimelinePublish,
+                                           boolean useMarkerBasedStrategy) {
     super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
   }
 
   @Override
+  protected RollbackStrategy getRollbackStrategy() {
+    if (useMarkerBasedStrategy) {
+      return new MarkerBasedRollbackStrategy(table, context, config, instantTime);
+    } else {
+      return this::executeRollbackUsingFileListing;
+    }
+  }
+
+  @Override
   protected List<HoodieRollbackStat> executeRollback() {
     HoodieTimer rollbackTimer = new HoodieTimer();
     rollbackTimer.startTimer();
@@ -88,4 +97,11 @@ public abstract class BaseCopyOnWriteRollbackActionExecutor<T extends HoodieReco
     LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());
     return stats;
   }
+
+  @Override
+  protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
+    List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
+        context, table.getMetaClient().getBasePath(), config);
+    return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
+  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
new file mode 100644
index 0000000..8490872
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Performs Rollback of Hoodie Tables.
+ */
+public class ListingBasedRollbackHelper implements Serializable {
+  private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
+
+  private final HoodieTableMetaClient metaClient;
+  private final HoodieWriteConfig config;
+
+  public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+    this.metaClient = metaClient;
+    this.config = config;
+  }
+
+  /**
+   * Performs all rollback actions that we have collected in parallel.
+   */
+  public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback,
+                                                  List<ListingBasedRollbackRequest> rollbackRequests) {
+    int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
+    context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions");
+    return context.mapToPairAndReduceByKey(rollbackRequests,
+        rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, true),
+        RollbackUtils::mergeRollbackStat,
+        parallelism);
+  }
+
+  /**
+   * Collect all file info that needs to be rollbacked.
+   */
+  public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback,
+                                                       List<ListingBasedRollbackRequest> rollbackRequests) {
+    int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
+    context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade");
+    return context.mapToPairAndReduceByKey(rollbackRequests,
+        rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, false),
+        RollbackUtils::mergeRollbackStat,
+        parallelism);
+  }
+
+  /**
+   * May be delete interested files and collect stats or collect stats only.
+   *
+   * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
+   * @param doDelete          {@code true} if deletion has to be done.
+   *                          {@code false} if only stats are to be collected w/o performing any deletes.
+   * @return stats collected with or w/o actual deletions.
+   */
+  private Pair<String, HoodieRollbackStat> maybeDeleteAndCollectStats(ListingBasedRollbackRequest rollbackRequest,
+                                                                      HoodieInstant instantToRollback,
+                                                                      boolean doDelete) throws IOException {
+    switch (rollbackRequest.getType()) {
+      case DELETE_DATA_FILES_ONLY: {
+        final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
+            rollbackRequest.getPartitionPath(), doDelete);
+        return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
+            HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                .withDeletedFileResults(filesToDeletedStatus).build());
+      }
+      case DELETE_DATA_AND_LOG_FILES: {
+        final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
+        return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
+            HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                .withDeletedFileResults(filesToDeletedStatus).build());
+      }
+      case APPEND_ROLLBACK_BLOCK: {
+        String fileId = rollbackRequest.getFileId().get();
+        String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
+
+        // collect all log files that is supposed to be deleted with this rollback
+        Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
+            FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
+            fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
+            .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
+
+        HoodieLogFormat.Writer writer = null;
+        try {
+          writer = HoodieLogFormat.newWriterBuilder()
+              .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
+              .withFileId(fileId)
+              .overBaseCommit(latestBaseInstant)
+              .withFs(metaClient.getFs())
+              .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+
+          // generate metadata
+          if (doDelete) {
+            Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
+            // if update belongs to an existing log file
+            writer.appendBlock(new HoodieCommandBlock(header));
+          }
+        } catch (IOException | InterruptedException io) {
+          throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
+        } finally {
+          try {
+            if (writer != null) {
+              writer.close();
+            }
+          } catch (IOException io) {
+            throw new HoodieIOException("Error appending rollback block..", io);
+          }
+        }
+
+        // This step is intentionally done after writer is closed. Guarantees that
+        // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
+        // cloud-storage : HUDI-168
+        Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
+            metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
+            1L
+        );
+
+        return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
+            HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                .withRollbackBlockAppendResults(filesToNumBlocksRollback)
+                .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
+      }
+      default:
+        throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
+    }
+  }
+
+  /**
+   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
+   */
+  private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
+                                                         String commit, String partitionPath, boolean doDelete) throws IOException {
+    LOG.info("Cleaning path " + partitionPath);
+    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    SerializablePathFilter filter = (path) -> {
+      if (path.toString().endsWith(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      } else if (FSUtils.isLogFile(path)) {
+        // Since the baseCommitTime is the only commit for new log files, it's okay here
+        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+
+    final Map<FileStatus, Boolean> results = new HashMap<>();
+    FileSystem fs = metaClient.getFs();
+    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
+    for (FileStatus file : toBeDeleted) {
+      if (doDelete) {
+        boolean success = fs.delete(file.getPath(), false);
+        results.put(file, success);
+        LOG.info("Delete file " + file.getPath() + "\t" + success);
+      } else {
+        results.put(file, true);
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
+   */
+  private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
+                                                   String commit, String partitionPath, boolean doDelete) throws IOException {
+    final Map<FileStatus, Boolean> results = new HashMap<>();
+    LOG.info("Cleaning path " + partitionPath);
+    FileSystem fs = metaClient.getFs();
+    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    PathFilter filter = (path) -> {
+      if (path.toString().contains(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
+    for (FileStatus file : toBeDeleted) {
+      if (doDelete) {
+        boolean success = fs.delete(file.getPath(), false);
+        results.put(file, success);
+        LOG.info("Delete file " + file.getPath() + "\t" + success);
+      } else {
+        results.put(file, true);
+      }
+    }
+    return results;
+  }
+
+  private Map<HeaderMetadataType, String> generateHeader(String commit) {
+    // generate metadata
+    Map<HeaderMetadataType, String> header = new HashMap<>(3);
+    header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
+    header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
+    header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
+        String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
+    return header;
+  }
+
+  public interface SerializablePathFilter extends PathFilter, Serializable {
+
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
similarity index 63%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
index cc596ba..1bfd4b1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -20,34 +20,43 @@ package org.apache.hudi.table.action.rollback;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
+import org.apache.hudi.table.marker.WriteMarkers;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * Performs rollback using marker files generated during the write..
  */
-public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecordPayload, I, K, O> implements BaseRollbackActionExecutor.RollbackStrategy {
+public class MarkerBasedRollbackStrategy implements BaseRollbackActionExecutor.RollbackStrategy {
 
-  private static final Logger LOG = LogManager.getLogger(AbstractMarkerBasedRollbackStrategy.class);
+  private static final Logger LOG = LogManager.getLogger(MarkerBasedRollbackStrategy.class);
 
-  protected final HoodieTable<T, I, K, O> table;
+  protected final HoodieTable<?, ?, ?, ?> table;
 
   protected final transient HoodieEngineContext context;
 
@@ -57,7 +66,7 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
 
   protected final String instantTime;
 
-  public AbstractMarkerBasedRollbackStrategy(HoodieTable<T, I, K, O> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
+  public MarkerBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
     this.table = table;
     this.context = context;
     this.basePath = table.getMetaClient().getBasePath();
@@ -124,8 +133,8 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
 
     // the information of files appended to is required for metadata sync
     Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
-          table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
-          1L);
+        table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
+        1L);
 
     return HoodieRollbackStat.newBuilder()
         .withPartitionPath(partitionPath)
@@ -135,13 +144,48 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
 
   /**
    * Returns written log file size map for the respective baseCommitTime to assist in metadata table syncing.
-   * @param partitionPath partition path of interest
-   * @param baseCommitTime base commit time of interest
-   * @param fileId fileId of interest
+   *
+   * @param partitionPathStr partition path of interest
+   * @param baseCommitTime   base commit time of interest
+   * @param fileId           fileId of interest
    * @return Map<FileStatus, File size>
    * @throws IOException
    */
-  protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPath, String baseCommitTime, String fileId) throws IOException {
-    return Collections.EMPTY_MAP;
+  protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
+    // collect all log files that is supposed to be deleted with this rollback
+    return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
+        FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime)
+        .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
+  }
+
+  @Override
+  public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
+    try {
+      List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
+          table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
+      int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1);
+      context.setJobStatus(this.getClass().getSimpleName(), "Rolling back using marker files");
+      return context.mapToPairAndReduceByKey(markerPaths, markerFilePath -> {
+        String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
+        IOType type = IOType.valueOf(typeStr);
+        HoodieRollbackStat rollbackStat;
+        switch (type) {
+          case MERGE:
+            rollbackStat = undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
+            break;
+          case APPEND:
+            rollbackStat = undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback);
+            break;
+          case CREATE:
+            rollbackStat = undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
+            break;
+          default:
+            throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
+        }
+        return new ImmutablePair<>(rollbackStat.getPartitionPath(), rollbackStat);
+      }, RollbackUtils::mergeRollbackStat, parallelism);
+    } catch (Exception e) {
+      throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
+    }
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
similarity index 54%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
index 2e75144..87d2628 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
@@ -7,13 +7,14 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.hudi.table.action.rollback;
@@ -24,38 +25,50 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-public abstract class BaseMergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseRollbackActionExecutor<T, I, K, O> {
+public class MergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseRollbackActionExecutor<T, I, K, O> {
 
-  private static final Logger LOG = LogManager.getLogger(BaseMergeOnReadRollbackActionExecutor.class);
+  private static final Logger LOG = LogManager.getLogger(MergeOnReadRollbackActionExecutor.class);
 
-  public BaseMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
-                                               HoodieWriteConfig config,
-                                               HoodieTable<T, I, K, O> table,
-                                               String instantTime,
-                                               HoodieInstant commitInstant,
-                                               boolean deleteInstants) {
+  public MergeOnReadRollbackActionExecutor(HoodieEngineContext context,
+                                           HoodieWriteConfig config,
+                                           HoodieTable<T, I, K, O> table,
+                                           String instantTime,
+                                           HoodieInstant commitInstant,
+                                           boolean deleteInstants) {
     super(context, config, table, instantTime, commitInstant, deleteInstants);
   }
 
-  public BaseMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
-                                               HoodieWriteConfig config,
-                                               HoodieTable<T, I, K, O> table,
-                                               String instantTime,
-                                               HoodieInstant commitInstant,
-                                               boolean deleteInstants,
-                                               boolean skipTimelinePublish,
-                                               boolean useMarkerBasedStrategy) {
+  public MergeOnReadRollbackActionExecutor(HoodieEngineContext context,
+                                           HoodieWriteConfig config,
+                                           HoodieTable<T, I, K, O> table,
+                                           String instantTime,
+                                           HoodieInstant commitInstant,
+                                           boolean deleteInstants,
+                                           boolean skipTimelinePublish,
+                                           boolean useMarkerBasedStrategy) {
     super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
   }
 
   @Override
+  protected RollbackStrategy getRollbackStrategy() {
+    if (useMarkerBasedStrategy) {
+      return new MarkerBasedRollbackStrategy(table, context, config, instantTime);
+    } else {
+      return this::executeRollbackUsingFileListing;
+    }
+  }
+
+  @Override
   protected List<HoodieRollbackStat> executeRollback() {
     HoodieTimer rollbackTimer = new HoodieTimer();
     rollbackTimer.startTimer();
@@ -93,4 +106,15 @@ public abstract class BaseMergeOnReadRollbackActionExecutor<T extends HoodieReco
     LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());
     return allRollbackStats;
   }
+
+  @Override
+  protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant resolvedInstant) {
+    List<ListingBasedRollbackRequest> rollbackRequests;
+    try {
+      rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context);
+    } catch (IOException e) {
+      throw new HoodieIOException("Error generating rollback requests by file listing.", e);
+    }
+    return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests);
+  }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 2fc5af1..66b7e78 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
@@ -32,6 +33,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -43,6 +45,7 @@ import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWra
 import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
 import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
 import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
+import static org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrapper;
 
 /**
  * A flink engine implementation of HoodieEngineContext.
@@ -75,6 +78,15 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext {
   }
 
   @Override
+  public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+    return data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
+        .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+        .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
+        .filter(Objects::nonNull)
+        .collect(Collectors.toList());
+  }
+
+  @Override
   public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
     return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList());
   }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 8a9b4bf..27571bc 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -53,7 +53,7 @@ import org.apache.hudi.table.action.commit.FlinkInsertPreppedCommitActionExecuto
 import org.apache.hudi.table.action.commit.FlinkMergeHelper;
 import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
-import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -305,7 +305,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
 
   @Override
   public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
-    return new FlinkCopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
+    return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
   }
 
   @Override
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
index bfe8b6f..4614270 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
@@ -37,7 +37,7 @@ import org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExe
 import org.apache.hudi.table.action.commit.delta.FlinkUpsertPreppedDeltaCommitActionExecutor;
 import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
 import org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor;
-import org.apache.hudi.table.action.rollback.FlinkMergeOnReadRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
 
 import java.util.List;
 import java.util.Map;
@@ -108,7 +108,7 @@ public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
 
   @Override
   public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
-    return new FlinkMergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
+    return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
   }
 }
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java
deleted file mode 100644
index 47039a3..0000000
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-
-import java.util.List;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class FlinkCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload> extends
-    BaseCopyOnWriteRollbackActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
-  public FlinkCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
-                                                HoodieWriteConfig config,
-                                                HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
-                                                String instantTime,
-                                                HoodieInstant commitInstant,
-                                                boolean deleteInstants) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants);
-  }
-
-  public FlinkCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
-                                                HoodieWriteConfig config,
-                                                HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
-                                                String instantTime,
-                                                HoodieInstant commitInstant,
-                                                boolean deleteInstants,
-                                                boolean skipTimelinePublish,
-                                                boolean useMarkerBasedStrategy) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
-  }
-
-  @Override
-  protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
-    if (useMarkerBasedStrategy) {
-      return new FlinkMarkerBasedRollbackStrategy(table, context, config, instantTime);
-    } else {
-      return this::executeRollbackUsingFileListing;
-    }
-  }
-
-  @Override
-  protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
-    List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
-        context, table.getMetaClient().getBasePath(), config);
-    return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
-  }
-}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java
deleted file mode 100644
index bb7ec76..0000000
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
-import org.apache.hudi.table.marker.WriteMarkers;
-
-import org.apache.hadoop.fs.FileStatus;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import scala.Tuple2;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class FlinkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> extends AbstractMarkerBasedRollbackStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
-  public FlinkMarkerBasedRollbackStrategy(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
-    super(table, context, config, instantTime);
-  }
-
-  @Override
-  public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
-    try {
-      List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
-          table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
-      List<HoodieRollbackStat> rollbackStats = context.map(markerPaths, markerFilePath -> {
-        String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
-        IOType type = IOType.valueOf(typeStr);
-        switch (type) {
-          case MERGE:
-            return undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
-          case APPEND:
-            return undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback);
-          case CREATE:
-            return undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
-          default:
-            throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
-        }
-      }, 0);
-
-      return rollbackStats.stream().map(rollbackStat -> new Tuple2<>(rollbackStat.getPartitionPath(), rollbackStat))
-          .collect(Collectors.groupingBy(Tuple2::_1))
-          .values()
-          .stream()
-          .map(x -> x.stream().map(y -> y._2).reduce(RollbackUtils::mergeRollbackStat).get())
-          .collect(Collectors.toList());
-    } catch (Exception e) {
-      throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
-    }
-  }
-
-  protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
-    // collect all log files that is supposed to be deleted with this rollback
-    return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
-        FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime)
-        .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
-  }
-}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java
deleted file mode 100644
index 25b20a5..0000000
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.table.HoodieTable;
-
-import java.io.IOException;
-import java.util.List;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class FlinkMergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload> extends
-    BaseMergeOnReadRollbackActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
-  public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
-                                                HoodieWriteConfig config,
-                                                HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
-                                                String instantTime,
-                                                HoodieInstant commitInstant,
-                                                boolean deleteInstants) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants);
-  }
-
-  public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
-                                                HoodieWriteConfig config,
-                                                HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
-                                                String instantTime,
-                                                HoodieInstant commitInstant,
-                                                boolean deleteInstants,
-                                                boolean skipTimelinePublish,
-                                                boolean useMarkerBasedStrategy) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
-  }
-
-  @Override
-  protected RollbackStrategy getRollbackStrategy() {
-    if (useMarkerBasedStrategy) {
-      return new FlinkMarkerBasedRollbackStrategy(table, context, config, instantTime);
-    } else {
-      return this::executeRollbackUsingFileListing;
-    }
-  }
-
-  @Override
-  protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant resolvedInstant) {
-    List<ListingBasedRollbackRequest> rollbackRequests;
-    try {
-      rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context);
-    } catch (IOException e) {
-      throw new HoodieIOException("Error generating rollback requests by file listing.", e);
-    }
-    return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests);
-  }
-}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
deleted file mode 100644
index f03b211..0000000
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.collection.ImmutablePair;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieRollbackException;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * Performs Rollback of Hoodie Tables.
- */
-public class ListingBasedRollbackHelper implements Serializable {
-
-  private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
-
-  private final HoodieTableMetaClient metaClient;
-  private final HoodieWriteConfig config;
-
-  public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
-    this.metaClient = metaClient;
-    this.config = config;
-  }
-
-  /**
-   * Performs all rollback actions that we have collected in parallel.
-   */
-  public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
-    Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, true);
-
-    Map<String, List<Pair<String, HoodieRollbackStat>>> collect = partitionPathRollbackStatsPairs.entrySet()
-        .stream()
-        .map(x -> Pair.of(x.getKey(), x.getValue())).collect(Collectors.groupingBy(Pair::getLeft));
-    return collect.values().stream()
-        .map(pairs -> pairs.stream().map(Pair::getRight).reduce(RollbackUtils::mergeRollbackStat).orElse(null))
-        .filter(Objects::nonNull)
-        .collect(Collectors.toList());
-  }
-
-  /**
-   * Collect all file info that needs to be rollbacked.
-   */
-  public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
-    Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, false);
-    return new ArrayList<>(partitionPathRollbackStatsPairs.values());
-  }
-
-  /**
-   * May be delete interested files and collect stats or collect stats only.
-   *
-   * @param context           instance of {@link HoodieEngineContext} to use.
-   * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
-   * @param rollbackRequests  List of {@link ListingBasedRollbackRequest} to be operated on.
-   * @param doDelete          {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes.
-   * @return stats collected with or w/o actual deletions.
-   */
-  Map<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineContext context,
-                                                             HoodieInstant instantToRollback,
-                                                             List<ListingBasedRollbackRequest> rollbackRequests,
-                                                             boolean doDelete) {
-    return context.mapToPair(rollbackRequests, rollbackRequest -> {
-      switch (rollbackRequest.getType()) {
-        case DELETE_DATA_FILES_ONLY: {
-          final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
-              rollbackRequest.getPartitionPath(), doDelete);
-          return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-              HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                  .withDeletedFileResults(filesToDeletedStatus).build());
-        }
-        case DELETE_DATA_AND_LOG_FILES: {
-          final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
-          return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-              HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                  .withDeletedFileResults(filesToDeletedStatus).build());
-        }
-        case APPEND_ROLLBACK_BLOCK: {
-          String fileId = rollbackRequest.getFileId().get();
-          String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
-
-          // collect all log files that is supposed to be deleted with this rollback
-          Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
-              FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
-              fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
-              .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
-
-          HoodieLogFormat.Writer writer = null;
-          try {
-            writer = HoodieLogFormat.newWriterBuilder()
-                .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
-                .withFileId(fileId)
-                .overBaseCommit(latestBaseInstant)
-                .withFs(metaClient.getFs())
-                .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-
-            // generate metadata
-            if (doDelete) {
-              Map<HoodieLogBlock.HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
-              // if update belongs to an existing log file
-              writer.appendBlock(new HoodieCommandBlock(header));
-            }
-          } catch (IOException | InterruptedException io) {
-            throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
-          } finally {
-            try {
-              if (writer != null) {
-                writer.close();
-              }
-            } catch (IOException io) {
-              throw new HoodieIOException("Error appending rollback block..", io);
-            }
-          }
-
-          // This step is intentionally done after writer is closed. Guarantees that
-          // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
-          // cloud-storage : HUDI-168
-          Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
-              metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
-              1L
-          );
-          return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-              HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                  .withRollbackBlockAppendResults(filesToNumBlocksRollback)
-                  .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
-        }
-        default:
-          throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
-      }
-    }, 0);
-  }
-
-  /**
-   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
-   */
-  private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-                                                         String commit, String partitionPath, boolean doDelete) throws IOException {
-    LOG.info("Cleaning path " + partitionPath);
-    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
-    SerializablePathFilter filter = (path) -> {
-      if (path.toString().endsWith(basefileExtension)) {
-        String fileCommitTime = FSUtils.getCommitTime(path.getName());
-        return commit.equals(fileCommitTime);
-      } else if (FSUtils.isLogFile(path)) {
-        // Since the baseCommitTime is the only commit for new log files, it's okay here
-        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
-        return commit.equals(fileCommitTime);
-      }
-      return false;
-    };
-
-    final Map<FileStatus, Boolean> results = new HashMap<>();
-    FileSystem fs = metaClient.getFs();
-    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
-    for (FileStatus file : toBeDeleted) {
-      if (doDelete) {
-        boolean success = fs.delete(file.getPath(), false);
-        results.put(file, success);
-        LOG.info("Delete file " + file.getPath() + "\t" + success);
-      } else {
-        results.put(file, true);
-      }
-    }
-    return results;
-  }
-
-  /**
-   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
-   */
-  private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-                                                   String commit, String partitionPath, boolean doDelete) throws IOException {
-    final Map<FileStatus, Boolean> results = new HashMap<>();
-    LOG.info("Cleaning path " + partitionPath);
-    FileSystem fs = metaClient.getFs();
-    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
-    PathFilter filter = (path) -> {
-      if (path.toString().contains(basefileExtension)) {
-        String fileCommitTime = FSUtils.getCommitTime(path.getName());
-        return commit.equals(fileCommitTime);
-      }
-      return false;
-    };
-    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
-    for (FileStatus file : toBeDeleted) {
-      if (doDelete) {
-        boolean success = fs.delete(file.getPath(), false);
-        results.put(file, success);
-        LOG.info("Delete file " + file.getPath() + "\t" + success);
-      } else {
-        results.put(file, true);
-      }
-    }
-    return results;
-  }
-
-  private Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String commit) {
-    // generate metadata
-    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(3);
-    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
-    header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
-    header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
-        String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
-    return header;
-  }
-
-  public interface SerializablePathFilter extends PathFilter, Serializable {
-
-  }
-}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index 59e94e5..cb024c6 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -27,8 +27,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
 import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
+import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
 
 import java.util.List;
 
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index 013e094..f7a28e2 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
@@ -40,6 +41,7 @@ import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWra
 import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
 import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
 import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
+import static org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrapper;
 
 /**
  * A java engine implementation of HoodieEngineContext.
@@ -60,6 +62,14 @@ public class HoodieJavaEngineContext extends HoodieEngineContext {
   }
 
   @Override
+  public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+    return data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
+        .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+        .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get())
+        .collect(Collectors.toList());
+  }
+
+  @Override
   public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
     return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList());
   }
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 06e66a1..7715bf9 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -50,7 +50,7 @@ import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor
 import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.restore.JavaCopyOnWriteRestoreActionExecutor;
-import org.apache.hudi.table.action.rollback.JavaCopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
 
 import java.util.List;
@@ -193,7 +193,7 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
                                          String rollbackInstantTime,
                                          HoodieInstant commitInstant,
                                          boolean deleteInstants) {
-    return new JavaCopyOnWriteRollbackActionExecutor(
+    return new CopyOnWriteRollbackActionExecutor(
         context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
   }
 
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java
index 75c1e0e..f7677ae 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java
@@ -30,7 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.rollback.JavaCopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 
 import java.util.List;
 
@@ -48,7 +48,7 @@ public class JavaCopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload>
   @Override
   protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
     table.getMetaClient().reloadActiveTimeline();
-    JavaCopyOnWriteRollbackActionExecutor rollbackActionExecutor = new JavaCopyOnWriteRollbackActionExecutor(
+    CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(
         context,
         config,
         table,
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java
deleted file mode 100644
index 15e3932..0000000
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-
-import java.util.List;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class JavaCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload> extends
-    BaseCopyOnWriteRollbackActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
-  public JavaCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
-                                               HoodieWriteConfig config,
-                                               HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
-                                               String instantTime,
-                                               HoodieInstant commitInstant,
-                                               boolean deleteInstants) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants);
-  }
-
-  public JavaCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
-                                               HoodieWriteConfig config,
-                                               HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
-                                               String instantTime,
-                                               HoodieInstant commitInstant,
-                                               boolean deleteInstants,
-                                               boolean skipTimelinePublish,
-                                               boolean useMarkerBasedStrategy) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
-  }
-
-  @Override
-  protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
-    if (useMarkerBasedStrategy) {
-      return new JavaMarkerBasedRollbackStrategy(table, context, config, instantTime);
-    } else {
-      return this::executeRollbackUsingFileListing;
-    }
-  }
-
-  @Override
-  protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
-    List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
-        context, table.getMetaClient().getBasePath(), config);
-    return new JavaListingBasedRollbackHelper(table.getMetaClient(), config)
-        .performRollback(context, instantToRollback, rollbackRequests);
-  }
-}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java
deleted file mode 100644
index 5331ca5..0000000
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.collection.ImmutablePair;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieRollbackException;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * Performs Rollback of Hoodie Tables.
- */
-public class JavaListingBasedRollbackHelper implements Serializable {
-
-  private static final Logger LOG = LogManager.getLogger(JavaListingBasedRollbackHelper.class);
-
-  private final HoodieTableMetaClient metaClient;
-  private final HoodieWriteConfig config;
-
-  public JavaListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
-    this.metaClient = metaClient;
-    this.config = config;
-  }
-
-  /**
-   * Performs all rollback actions that we have collected in parallel.
-   */
-  public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
-    Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, true);
-
-    Map<String, List<Pair<String, HoodieRollbackStat>>> collect = partitionPathRollbackStatsPairs.entrySet()
-        .stream()
-        .map(x -> Pair.of(x.getKey(), x.getValue())).collect(Collectors.groupingBy(Pair::getLeft));
-    return collect.values().stream()
-        .map(pairs -> pairs.stream().map(Pair::getRight).reduce(RollbackUtils::mergeRollbackStat).orElse(null))
-        .filter(Objects::nonNull)
-        .collect(Collectors.toList());
-  }
-
-  /**
-   * Collect all file info that needs to be rollbacked.
-   */
-  public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
-    Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, false);
-    return new ArrayList<>(partitionPathRollbackStatsPairs.values());
-  }
-
-  /**
-   * May be delete interested files and collect stats or collect stats only.
-   *
-   * @param context           instance of {@link HoodieEngineContext} to use.
-   * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
-   * @param rollbackRequests  List of {@link ListingBasedRollbackRequest} to be operated on.
-   * @param doDelete          {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes.
-   * @return stats collected with or w/o actual deletions.
-   */
-  Map<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineContext context,
-                                                             HoodieInstant instantToRollback,
-                                                             List<ListingBasedRollbackRequest> rollbackRequests,
-                                                             boolean doDelete) {
-    return context.mapToPair(rollbackRequests, rollbackRequest -> {
-      switch (rollbackRequest.getType()) {
-        case DELETE_DATA_FILES_ONLY: {
-          final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
-              rollbackRequest.getPartitionPath(), doDelete);
-          return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-              HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                  .withDeletedFileResults(filesToDeletedStatus).build());
-        }
-        case DELETE_DATA_AND_LOG_FILES: {
-          final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
-          return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-              HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                  .withDeletedFileResults(filesToDeletedStatus).build());
-        }
-        case APPEND_ROLLBACK_BLOCK: {
-          HoodieLogFormat.Writer writer = null;
-          try {
-            writer = HoodieLogFormat.newWriterBuilder()
-                .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
-                .withFileId(rollbackRequest.getFileId().get())
-                .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs())
-                .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-
-            // generate metadata
-            if (doDelete) {
-              Map<HoodieLogBlock.HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
-              // if update belongs to an existing log file
-              writer.appendBlock(new HoodieCommandBlock(header));
-            }
-          } catch (IOException | InterruptedException io) {
-            throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
-          } finally {
-            try {
-              if (writer != null) {
-                writer.close();
-              }
-            } catch (IOException io) {
-              throw new HoodieIOException("Error appending rollback block..", io);
-            }
-          }
-
-          // This step is intentionally done after writer is closed. Guarantees that
-          // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
-          // cloud-storage : HUDI-168
-          Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
-              metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), 1L
-          );
-          return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-              HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                  .withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
-        }
-        default:
-          throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
-      }
-    }, 0);
-  }
-
-  /**
-   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
-   */
-  private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-                                                         String commit, String partitionPath, boolean doDelete) throws IOException {
-    LOG.info("Cleaning path " + partitionPath);
-    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
-    SerializablePathFilter filter = (path) -> {
-      if (path.toString().endsWith(basefileExtension)) {
-        String fileCommitTime = FSUtils.getCommitTime(path.getName());
-        return commit.equals(fileCommitTime);
-      } else if (FSUtils.isLogFile(path)) {
-        // Since the baseCommitTime is the only commit for new log files, it's okay here
-        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
-        return commit.equals(fileCommitTime);
-      }
-      return false;
-    };
-
-    final Map<FileStatus, Boolean> results = new HashMap<>();
-    FileSystem fs = metaClient.getFs();
-    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
-    for (FileStatus file : toBeDeleted) {
-      if (doDelete) {
-        boolean success = fs.delete(file.getPath(), false);
-        results.put(file, success);
-        LOG.info("Delete file " + file.getPath() + "\t" + success);
-      } else {
-        results.put(file, true);
-      }
-    }
-    return results;
-  }
-
-  /**
-   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
-   */
-  private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-                                                   String commit, String partitionPath, boolean doDelete) throws IOException {
-    final Map<FileStatus, Boolean> results = new HashMap<>();
-    LOG.info("Cleaning path " + partitionPath);
-    FileSystem fs = metaClient.getFs();
-    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
-    PathFilter filter = (path) -> {
-      if (path.toString().contains(basefileExtension)) {
-        String fileCommitTime = FSUtils.getCommitTime(path.getName());
-        return commit.equals(fileCommitTime);
-      }
-      return false;
-    };
-    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
-    for (FileStatus file : toBeDeleted) {
-      if (doDelete) {
-        boolean success = fs.delete(file.getPath(), false);
-        results.put(file, success);
-        LOG.info("Delete file " + file.getPath() + "\t" + success);
-      } else {
-        results.put(file, true);
-      }
-    }
-    return results;
-  }
-
-  private Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String commit) {
-    // generate metadata
-    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(3);
-    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
-    header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
-    header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
-        String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
-    return header;
-  }
-
-  public interface SerializablePathFilter extends PathFilter, Serializable {
-
-  }
-}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java
deleted file mode 100644
index 150f663..0000000
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
-import org.apache.hudi.table.marker.WriteMarkers;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class JavaMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> extends AbstractMarkerBasedRollbackStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
-  public JavaMarkerBasedRollbackStrategy(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
-                                         HoodieEngineContext context,
-                                         HoodieWriteConfig config,
-                                         String instantTime) {
-    super(table, context, config, instantTime);
-  }
-
-  @Override
-  public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
-    try {
-      List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
-          table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
-      List<HoodieRollbackStat> rollbackStats = context.map(markerPaths, markerFilePath -> {
-        String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
-        IOType type = IOType.valueOf(typeStr);
-        switch (type) {
-          case MERGE:
-            return undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
-          case APPEND:
-            return undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback);
-          case CREATE:
-            return undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
-          default:
-            throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
-        }
-      }, 0);
-
-      return rollbackStats.stream().map(rollbackStat -> Pair.of(rollbackStat.getPartitionPath(), rollbackStat))
-          .collect(Collectors.groupingBy(Pair::getKey))
-          .values()
-          .stream()
-          .map(x -> x.stream().map(y -> y.getValue()).reduce(RollbackUtils::mergeRollbackStat).get())
-          .collect(Collectors.toList());
-    } catch (Exception e) {
-      throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
-    }
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index d869ec7..ad1d7cd 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
@@ -74,6 +75,14 @@ public class HoodieSparkEngineContext extends HoodieEngineContext {
   }
 
   @Override
+  public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+    return javaSparkContext.parallelize(data, parallelism).mapToPair(input -> {
+      Pair<K, V> pair = mapToPairFunc.call(input);
+      return new Tuple2<>(pair.getLeft(), pair.getRight());
+    }).reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect();
+  }
+
+  @Override
   public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
     return javaSparkContext.parallelize(data, parallelism).flatMap(x -> func.apply(x).iterator()).collect();
   }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 6a2bd6f..26d14cf 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -65,7 +65,7 @@ import org.apache.hudi.table.action.commit.SparkMergeHelper;
 import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.restore.SparkCopyOnWriteRestoreActionExecutor;
-import org.apache.hudi.table.action.rollback.SparkCopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -247,7 +247,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
 
   @Override
   public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
-    return new SparkCopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
+    return new CopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index 997116e..2db4eeb 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -49,7 +49,8 @@ import org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExec
 import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
 import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
 import org.apache.hudi.table.action.restore.SparkMergeOnReadRestoreActionExecutor;
-import org.apache.hudi.table.action.rollback.SparkMergeOnReadRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
+
 import org.apache.spark.api.java.JavaRDD;
 
 import java.util.List;
@@ -146,7 +147,7 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
                                          String rollbackInstantTime,
                                          HoodieInstant commitInstant,
                                          boolean deleteInstants) {
-    return new SparkMergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
+    return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
index 101b321..9c6ec6e 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
@@ -30,7 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.rollback.SparkCopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 
 import org.apache.spark.api.java.JavaRDD;
 
@@ -49,7 +49,7 @@ public class SparkCopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload
   @Override
   protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
     table.getMetaClient().reloadActiveTimeline();
-    SparkCopyOnWriteRollbackActionExecutor rollbackActionExecutor = new SparkCopyOnWriteRollbackActionExecutor(
+    CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(
         (HoodieSparkEngineContext) context,
         config,
         table,
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java
index c320579..ebca1fe 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java
@@ -29,7 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.rollback.SparkMergeOnReadRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
 
 import org.apache.spark.api.java.JavaRDD;
 
@@ -48,7 +48,7 @@ public class SparkMergeOnReadRestoreActionExecutor<T extends HoodieRecordPayload
   @Override
   protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
     table.getMetaClient().reloadActiveTimeline();
-    SparkMergeOnReadRollbackActionExecutor rollbackActionExecutor = new SparkMergeOnReadRollbackActionExecutor(
+    MergeOnReadRollbackActionExecutor rollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
         context,
         config,
         table,
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
deleted file mode 100644
index fcb3882..0000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieRollbackException;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-import scala.Tuple2;
-
-/**
- * Performs Rollback of Hoodie Tables.
- */
-public class ListingBasedRollbackHelper implements Serializable {
-
-  private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
-
-  private final HoodieTableMetaClient metaClient;
-  private final HoodieWriteConfig config;
-
-  public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
-    this.metaClient = metaClient;
-    this.config = config;
-  }
-
-  /**
-   * Performs all rollback actions that we have collected in parallel.
-   */
-  public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
-    int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
-    context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions");
-    JavaPairRDD<String, HoodieRollbackStat> partitionPathRollbackStatsPairRDD = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, sparkPartitions, true);
-    return partitionPathRollbackStatsPairRDD.reduceByKey(RollbackUtils::mergeRollbackStat).map(Tuple2::_2).collect();
-  }
-
-  /**
-   * Collect all file info that needs to be rollbacked.
-   */
-  public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
-    int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
-    context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade");
-    JavaPairRDD<String, HoodieRollbackStat> partitionPathRollbackStatsPairRDD = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, sparkPartitions, false);
-    return partitionPathRollbackStatsPairRDD.map(Tuple2::_2).collect();
-  }
-
-  /**
-   * May be delete interested files and collect stats or collect stats only.
-   *
-   * @param context instance of {@link HoodieEngineContext} to use.
-   * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
-   * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on.
-   * @param sparkPartitions number of spark partitions to use for parallelism.
-   * @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes.
-   * @return stats collected with or w/o actual deletions.
-   */
-  JavaPairRDD<String, HoodieRollbackStat> maybeDeleteAndCollectStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests,
-                                                                     int sparkPartitions, boolean doDelete) {
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-    return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> {
-      switch (rollbackRequest.getType()) {
-        case DELETE_DATA_FILES_ONLY: {
-          final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
-              rollbackRequest.getPartitionPath(), doDelete);
-          return new Tuple2<>(rollbackRequest.getPartitionPath(),
-              HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                  .withDeletedFileResults(filesToDeletedStatus).build());
-        }
-        case DELETE_DATA_AND_LOG_FILES: {
-          final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
-          return new Tuple2<>(rollbackRequest.getPartitionPath(),
-              HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                  .withDeletedFileResults(filesToDeletedStatus).build());
-        }
-        case APPEND_ROLLBACK_BLOCK: {
-          String fileId = rollbackRequest.getFileId().get();
-          String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
-
-          // collect all log files that is supposed to be deleted with this rollback
-          Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
-              FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
-              fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
-              .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
-
-          Writer writer = null;
-          try {
-            writer = HoodieLogFormat.newWriterBuilder()
-                .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
-                .withFileId(fileId)
-                .overBaseCommit(latestBaseInstant)
-                .withFs(metaClient.getFs())
-                .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-
-            // generate metadata
-            if (doDelete) {
-              Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
-              // if update belongs to an existing log file
-              writer.appendBlock(new HoodieCommandBlock(header));
-            }
-          } catch (IOException | InterruptedException io) {
-            throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
-          } finally {
-            try {
-              if (writer != null) {
-                writer.close();
-              }
-            } catch (IOException io) {
-              throw new HoodieIOException("Error appending rollback block..", io);
-            }
-          }
-
-          // This step is intentionally done after writer is closed. Guarantees that
-          // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
-          // cloud-storage : HUDI-168
-          Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
-              metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
-              1L
-          );
-
-          return new Tuple2<>(rollbackRequest.getPartitionPath(),
-              HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                  .withRollbackBlockAppendResults(filesToNumBlocksRollback)
-                  .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
-        }
-        default:
-          throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
-      }
-    });
-  }
-
-  /**
-   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
-   */
-  private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-      String commit, String partitionPath, boolean doDelete) throws IOException {
-    LOG.info("Cleaning path " + partitionPath);
-    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
-    SerializablePathFilter filter = (path) -> {
-      if (path.toString().endsWith(basefileExtension)) {
-        String fileCommitTime = FSUtils.getCommitTime(path.getName());
-        return commit.equals(fileCommitTime);
-      } else if (FSUtils.isLogFile(path)) {
-        // Since the baseCommitTime is the only commit for new log files, it's okay here
-        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
-        return commit.equals(fileCommitTime);
-      }
-      return false;
-    };
-
-    final Map<FileStatus, Boolean> results = new HashMap<>();
-    FileSystem fs = metaClient.getFs();
-    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
-    for (FileStatus file : toBeDeleted) {
-      if (doDelete) {
-        boolean success = fs.delete(file.getPath(), false);
-        results.put(file, success);
-        LOG.info("Delete file " + file.getPath() + "\t" + success);
-      } else {
-        results.put(file, true);
-      }
-    }
-    return results;
-  }
-
-  /**
-   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
-   */
-  private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-      String commit, String partitionPath, boolean doDelete) throws IOException {
-    final Map<FileStatus, Boolean> results = new HashMap<>();
-    LOG.info("Cleaning path " + partitionPath);
-    FileSystem fs = metaClient.getFs();
-    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
-    PathFilter filter = (path) -> {
-      if (path.toString().contains(basefileExtension)) {
-        String fileCommitTime = FSUtils.getCommitTime(path.getName());
-        return commit.equals(fileCommitTime);
-      }
-      return false;
-    };
-    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
-    for (FileStatus file : toBeDeleted) {
-      if (doDelete) {
-        boolean success = fs.delete(file.getPath(), false);
-        results.put(file, success);
-        LOG.info("Delete file " + file.getPath() + "\t" + success);
-      } else {
-        results.put(file, true);
-      }
-    }
-    return results;
-  }
-
-  private Map<HeaderMetadataType, String> generateHeader(String commit) {
-    // generate metadata
-    Map<HeaderMetadataType, String> header = new HashMap<>(3);
-    header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
-    header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
-    header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
-        String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
-    return header;
-  }
-
-  public interface SerializablePathFilter extends PathFilter, Serializable {
-
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java
deleted file mode 100644
index 611ec21..0000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.spark.api.java.JavaRDD;
-
-import java.util.List;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class SparkCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload> extends
-    BaseCopyOnWriteRollbackActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
-  public SparkCopyOnWriteRollbackActionExecutor(HoodieSparkEngineContext context,
-                                                HoodieWriteConfig config,
-                                                HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                                                String instantTime,
-                                                HoodieInstant commitInstant,
-                                                boolean deleteInstants) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants);
-  }
-
-  public SparkCopyOnWriteRollbackActionExecutor(HoodieSparkEngineContext context,
-                                                HoodieWriteConfig config,
-                                                HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                                                String instantTime,
-                                                HoodieInstant commitInstant,
-                                                boolean deleteInstants,
-                                                boolean skipTimelinePublish,
-                                                boolean useMarkerBasedStrategy) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
-  }
-
-  @Override
-  protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
-    if (useMarkerBasedStrategy) {
-      return new SparkMarkerBasedRollbackStrategy(table, context, config, instantTime);
-    } else {
-      return this::executeRollbackUsingFileListing;
-    }
-  }
-
-  @Override
-  protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
-    List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context,
-        table.getMetaClient().getBasePath(), config);
-    return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
deleted file mode 100644
index 0adacd2..0000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
-import org.apache.hudi.table.marker.WriteMarkers;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import scala.Tuple2;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class SparkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> extends AbstractMarkerBasedRollbackStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
-  public SparkMarkerBasedRollbackStrategy(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
-    super(table, context, config, instantTime);
-  }
-
-  @Override
-  public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-    try {
-      List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
-          table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
-      int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1);
-      jsc.setJobGroup(this.getClass().getSimpleName(), "Rolling back using marker files");
-      return jsc.parallelize(markerPaths, parallelism)
-          .map(markerFilePath -> {
-            String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
-            IOType type = IOType.valueOf(typeStr);
-            switch (type) {
-              case MERGE:
-                return undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
-              case APPEND:
-                return undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback);
-              case CREATE:
-                return undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
-              default:
-                throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
-            }
-          })
-          .mapToPair(rollbackStat -> new Tuple2<>(rollbackStat.getPartitionPath(), rollbackStat))
-          .reduceByKey(RollbackUtils::mergeRollbackStat)
-          .map(Tuple2::_2).collect();
-    } catch (Exception e) {
-      throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
-    }
-  }
-
-  protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
-    // collect all log files that is supposed to be deleted with this rollback
-    return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
-        FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime)
-        .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMergeOnReadRollbackActionExecutor.java
deleted file mode 100644
index 9486362..0000000
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMergeOnReadRollbackActionExecutor.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
-import java.io.IOException;
-import java.util.List;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class SparkMergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload> extends
-    BaseMergeOnReadRollbackActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
-  public SparkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
-                                                HoodieWriteConfig config,
-                                                HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                                                String instantTime,
-                                                HoodieInstant commitInstant,
-                                                boolean deleteInstants) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants);
-  }
-
-  public SparkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
-                                                HoodieWriteConfig config,
-                                                HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                                                String instantTime,
-                                                HoodieInstant commitInstant,
-                                                boolean deleteInstants,
-                                                boolean skipTimelinePublish,
-                                                boolean useMarkerBasedStrategy) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
-  }
-
-  @Override
-  protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
-    if (useMarkerBasedStrategy) {
-      return new SparkMarkerBasedRollbackStrategy(table, context, config, instantTime);
-    } else {
-      return this::executeRollbackUsingFileListing;
-    }
-  }
-
-  @Override
-  protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant resolvedInstant) {
-    List<ListingBasedRollbackRequest> rollbackRequests;
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-    try {
-      rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context);
-    } catch (IOException e) {
-      throw new HoodieIOException("Error generating rollback requests by file listing.", e);
-    }
-    return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests);
-  }
-}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index bc1f3c3..810733c 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -83,8 +83,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
     HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002");
 
     // execute CopyOnWriteRollbackActionExecutor with filelisting mode
-    SparkCopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new SparkCopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true);
-    assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy);
+    CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true);
+    assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
     List<HoodieRollbackStat> hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback();
 
     // assert hoodieRollbackStats
@@ -162,11 +162,11 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
       commitInstant = table.getCompletedCommitTimeline().lastInstant().get();
     }
 
-    SparkCopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new SparkCopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false);
+    CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false);
     if (!isUsingMarkers) {
-      assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy);
+      assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
     } else {
-      assertTrue(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy);
+      assertTrue(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
     }
     Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = copyOnWriteRollbackActionExecutor.execute().getPartitionMetadata();
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index 75e6a7a..5d269cf 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -89,7 +89,7 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
 
     //2. rollback
     HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
-    SparkMergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new SparkMergeOnReadRollbackActionExecutor(
+    MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
         context,
         cfg,
         table,
@@ -98,9 +98,9 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
         true);
     // assert is filelist mode
     if (!isUsingMarkers) {
-      assertFalse(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy);
+      assertFalse(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
     } else {
-      assertTrue(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy);
+      assertTrue(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
     }
 
     //3. assert the rollback stat
@@ -145,15 +145,15 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
   public void testFailForCompletedInstants() {
     Assertions.assertThrows(IllegalArgumentException.class, () -> {
       HoodieInstant rollBackInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
-      new SparkMergeOnReadRollbackActionExecutor(
-              context,
-              getConfigBuilder().build(),
-              getHoodieTable(metaClient, getConfigBuilder().build()),
-              "003",
-              rollBackInstant,
-              true,
-              true,
-              true);
+      new MergeOnReadRollbackActionExecutor(
+          context,
+          getConfigBuilder().build(),
+          getHoodieTable(metaClient, getConfigBuilder().build()),
+          "003",
+          rollBackInstant,
+          true,
+          true,
+          true);
     });
   }
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
index 6e67386..94fa697 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
@@ -32,7 +32,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkTable;
-import org.apache.hudi.table.action.rollback.SparkMarkerBasedRollbackStrategy;
+import org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -93,7 +93,7 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
         .withMarkerFile("partA", f2, IOType.CREATE);
 
     // when
-    List<HoodieRollbackStat> stats = new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002")
+    List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002")
         .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"));
 
     // then: ensure files are deleted correctly, non-existent files reported as failed deletes
@@ -176,7 +176,7 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
     writeStatuses.collect();
 
     // rollback 2nd commit and ensure stats reflect the info.
-    return new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(writeConfig, context, metaClient), context, writeConfig, "003")
+    return new MarkerBasedRollbackStrategy(HoodieSparkTable.create(writeConfig, context, metaClient), context, writeConfig, "003")
         .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"));
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 0128ce5..8ea6a43 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.engine;
 
 import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
@@ -56,6 +57,9 @@ public abstract class HoodieEngineContext {
 
   public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism);
 
+  public abstract <I, K, V> List<V> mapToPairAndReduceByKey(
+      List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism);
+
   public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism);
 
   public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index e804567..0aeb9d8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.engine;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
@@ -37,6 +38,7 @@ import static org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWra
 import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
 import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
 import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
+import static org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrapper;
 
 /**
  * A java based engine context, use this implementation on the query engine integrations if needed.
@@ -57,6 +59,15 @@ public final class HoodieLocalEngineContext extends HoodieEngineContext {
   }
 
   @Override
+  public <I, K, V> List<V> mapToPairAndReduceByKey(
+      List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+    return data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
+        .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+        .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get())
+        .collect(Collectors.toList());
+  }
+
+  @Override
   public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
     return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList());
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java b/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java
index 405f57e..b729e48 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.function;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 
+import java.util.function.BinaryOperator;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Stream;
@@ -70,4 +71,14 @@ public class FunctionWrapper {
       }
     };
   }
+
+  public static <V> BinaryOperator<V> throwingReduceWrapper(SerializableBiFunction<V, V, V> throwingReduceFunction) {
+    return (v1, v2) -> {
+      try {
+        return throwingReduceFunction.apply(v1, v2);
+      } catch (Exception e) {
+        throw new HoodieException("Error occurs when executing mapToPair", e);
+      }
+    };
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/function/SerializableBiFunction.java b/hudi-common/src/main/java/org/apache/hudi/common/function/SerializableBiFunction.java
new file mode 100644
index 0000000..940396c
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/function/SerializableBiFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.function;
+
+import java.io.Serializable;
+
+/**
+ * A function that accepts two arguments and produces a result.
+ *
+ * @param <T> the type of the first argument to the function
+ * @param <U> the type of the second argument to the function
+ * @param <R> the type of the result of the function
+ */
+@FunctionalInterface
+public interface SerializableBiFunction<T, U, R> extends Serializable {
+  R apply(T t, U u);
+}