You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/14 09:04:45 UTC

[hudi] branch HUDI-3406-revert created (now de4d5aa429)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a change to branch HUDI-3406-revert
in repository https://gitbox.apache.org/repos/asf/hudi.git


      at de4d5aa429 Revert "[HUDI-3406] Rollback incorrectly relying on FS listing instead of Com… (#4957)"

This branch includes the following new commits:

     new de4d5aa429 Revert "[HUDI-3406] Rollback incorrectly relying on FS listing instead of Com… (#4957)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[hudi] 01/01: Revert "[HUDI-3406] Rollback incorrectly relying on FS listing instead of Com… (#4957)"

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch HUDI-3406-revert
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit de4d5aa429f0aaaf1bb86123238fd40c69e49d85
Author: Raymond Xu <xu...@gmail.com>
AuthorDate: Thu Apr 14 17:03:46 2022 +0800

    Revert "[HUDI-3406] Rollback incorrectly relying on FS listing instead of Com… (#4957)"
    
    This reverts commit 98b4e9796e1e3e1f69954afa698ace5b28bde4a0.
---
 .../hudi/client/utils/MetadataConversionUtils.java |  17 +-
 .../table/action/rollback/BaseRollbackHelper.java  |   5 +
 .../rollback/ListingBasedRollbackHelper.java       | 150 ++++++++++
 .../rollback/ListingBasedRollbackStrategy.java     | 302 ++-------------------
 .../hudi/table/action/rollback/RollbackUtils.java  | 167 ++++++++++++
 .../action/rollback/SerializablePathFilter.java    |  26 --
 .../table/upgrade/ZeroToOneUpgradeHandler.java     |  34 ++-
 .../TestMergeOnReadRollbackActionExecutor.java     |   5 +-
 .../hudi/common/model/HoodieCommitMetadata.java    |  13 -
 9 files changed, 368 insertions(+), 351 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
index 342de74a11..c0405161d8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.utils;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
+
 import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
@@ -33,7 +34,6 @@ import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieRollingStatMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -186,19 +186,6 @@ public class MetadataConversionUtils {
     return Option.of(TimelineMetadataUtils.deserializeRequestedReplaceMetadata(requestedContent.get()));
   }
 
-  public static Option<HoodieCommitMetadata> getHoodieCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant hoodieInstant) throws IOException {
-    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-    HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
-
-    if (hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-      return Option.of(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
-          HoodieReplaceCommitMetadata.class));
-    }
-    return Option.of(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
-        HoodieCommitMetadata.class));
-
-  }
-
   public static org.apache.hudi.avro.model.HoodieCommitMetadata convertCommitMetadata(
           HoodieCommitMetadata hoodieCommitMetadata) {
     ObjectMapper mapper = new ObjectMapper();
@@ -213,4 +200,4 @@ public class MetadataConversionUtils {
     avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, "");
     return avroMetaData;
   }
-}
+}
\ No newline at end of file
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
index 8475afe16e..189de373d9 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.rollback;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -213,4 +214,8 @@ public class BaseRollbackHelper implements Serializable {
         String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
     return header;
   }
+
+  public interface SerializablePathFilter extends PathFilter, Serializable {
+
+  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
new file mode 100644
index 0000000000..628b2fc372
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
+
+/**
+ * Performs Rollback of Hoodie Tables.
+ */
+public class ListingBasedRollbackHelper implements Serializable {
+  private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
+
+  private final HoodieTableMetaClient metaClient;
+  private final HoodieWriteConfig config;
+
+  public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+    this.metaClient = metaClient;
+    this.config = config;
+  }
+
+  /**
+   * Collects info for Rollback plan.
+   */
+  public List<HoodieRollbackRequest> getRollbackRequestsForRollbackPlan(HoodieEngineContext context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> rollbackRequests) {
+    int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
+    context.setJobStatus(this.getClass().getSimpleName(), "Creating Rollback Plan");
+    return getListingBasedRollbackRequests(context, instantToRollback, rollbackRequests, sparkPartitions);
+  }
+
+  /**
+   * May be delete interested files and collect stats or collect stats only.
+   *
+   * @param context           instance of {@link HoodieEngineContext} to use.
+   * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
+   * @param rollbackRequests  List of {@link ListingBasedRollbackRequest} to be operated on.
+   * @param numPartitions     number of spark partitions to use for parallelism.
+   * @return stats collected with or w/o actual deletions.
+   */
+  private List<HoodieRollbackRequest> getListingBasedRollbackRequests(HoodieEngineContext context, HoodieInstant instantToRollback,
+                                                                      List<ListingBasedRollbackRequest> rollbackRequests, int numPartitions) {
+    return context.map(rollbackRequests, rollbackRequest -> {
+      switch (rollbackRequest.getType()) {
+        case DELETE_DATA_FILES_ONLY: {
+          final FileStatus[] filesToDeletedStatus = getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+              rollbackRequest.getPartitionPath(), metaClient.getFs());
+          List<String> filesToBeDeleted = Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+            String fileToBeDeleted = fileStatus.getPath().toString();
+            // strip scheme
+            return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1);
+          }).collect(Collectors.toList());
+          return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(),
+              EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, Collections.EMPTY_MAP);
+        }
+        case DELETE_DATA_AND_LOG_FILES: {
+          final FileStatus[] filesToDeletedStatus = getBaseAndLogFilesToBeDeleted(instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), metaClient.getFs());
+          List<String> filesToBeDeleted = Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+            String fileToBeDeleted = fileStatus.getPath().toString();
+            // strip scheme
+            return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1);
+          }).collect(Collectors.toList());
+          return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, Collections.EMPTY_MAP);
+        }
+        case APPEND_ROLLBACK_BLOCK: {
+          String fileId = rollbackRequest.getFileId().get();
+          String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
+          HoodieWriteStat writeStat = rollbackRequest.getWriteStat().get();
+
+          Path fullLogFilePath = FSUtils.getPartitionPath(config.getBasePath(), writeStat.getPath());
+
+          Map<String, Long> logFilesWithBlocksToRollback =
+              Collections.singletonMap(fullLogFilePath.toString(), writeStat.getTotalWriteBytes());
+
+          return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), fileId, latestBaseInstant,
+              Collections.EMPTY_LIST, logFilesWithBlocksToRollback);
+        }
+        default:
+          throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
+      }
+    }, numPartitions);
+  }
+
+  private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
+                                               String commit, String partitionPath, FileSystem fs) throws IOException {
+    LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit);
+    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    PathFilter filter = (path) -> {
+      if (path.toString().contains(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
+  }
+
+  private FileStatus[] getBaseAndLogFilesToBeDeleted(String commit, String partitionPath, FileSystem fs) throws IOException {
+    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    BaseRollbackHelper.SerializablePathFilter filter = (path) -> {
+      if (path.toString().endsWith(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      } else if (FSUtils.isLogFile(path)) {
+        // Since the baseCommitTime is the only commit for new log files, it's okay here
+        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index ed37798607..e6355526e5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -18,42 +18,19 @@
 
 package org.apache.hudi.table.action.rollback;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
-import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.jetbrains.annotations.NotNull;
 
-import static org.apache.hudi.client.utils.MetadataConversionUtils.getHoodieCommitMetadata;
-import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
-import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
+import java.io.IOException;
+import java.util.List;
 
 /**
  * Listing based rollback strategy to fetch list of {@link HoodieRollbackRequest}s.
@@ -62,15 +39,12 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu
 
   private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackStrategy.class);
 
-  protected final HoodieTable<?, ?, ?, ?> table;
-
-  protected final transient HoodieEngineContext context;
-
+  protected final HoodieTable table;
+  protected final HoodieEngineContext context;
   protected final HoodieWriteConfig config;
-
   protected final String instantTime;
 
-  public ListingBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> table,
+  public ListingBasedRollbackStrategy(HoodieTable table,
                                       HoodieEngineContext context,
                                       HoodieWriteConfig config,
                                       String instantTime) {
@@ -83,260 +57,20 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu
   @Override
   public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRollback) {
     try {
-      HoodieTableMetaClient metaClient = table.getMetaClient();
-      List<String> partitionPaths =
-          FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), false, false);
-      int numPartitions = Math.max(Math.min(partitionPaths.size(), config.getRollbackParallelism()), 1);
-
-      context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing Rollback Plan");
-
-      HoodieTableType tableType = table.getMetaClient().getTableType();
-      String baseFileExtension = getBaseFileExtension(metaClient);
-      Option<HoodieCommitMetadata> commitMetadataOptional = getHoodieCommitMetadata(metaClient, instantToRollback);
-      Boolean isCommitMetadataCompleted = checkCommitMetadataCompleted(instantToRollback, commitMetadataOptional);
-
-      return context.flatMap(partitionPaths, partitionPath -> {
-        List<HoodieRollbackRequest> hoodieRollbackRequests = new ArrayList<>(partitionPaths.size());
-        FileStatus[] filesToDelete =
-            fetchFilesFromInstant(instantToRollback, partitionPath, metaClient.getBasePath(), baseFileExtension,
-                metaClient.getFs(), commitMetadataOptional, isCommitMetadataCompleted);
-
-        if (HoodieTableType.COPY_ON_WRITE == tableType) {
-          hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete));
-        } else if (HoodieTableType.MERGE_ON_READ == tableType) {
-          String commit = instantToRollback.getTimestamp();
-          HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline();
-          switch (instantToRollback.getAction()) {
-            case HoodieTimeline.COMMIT_ACTION:
-            case HoodieTimeline.REPLACE_COMMIT_ACTION:
-              hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete));
-              break;
-            case HoodieTimeline.COMPACTION_ACTION:
-              // If there is no delta commit present after the current commit (if compaction), no action, else we
-              // need to make sure that a compaction commit rollback also deletes any log files written as part of the
-              // succeeding deltacommit.
-              boolean higherDeltaCommits =
-                  !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1)
-                      .empty();
-              if (higherDeltaCommits) {
-                // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled
-                // and has not yet finished. In this scenario we should delete only the newly created base files
-                // and not corresponding base commit log files created with this as baseCommit since updates would
-                // have been written to the log files.
-                hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath,
-                    listFilesToBeDeleted(instantToRollback.getTimestamp(), baseFileExtension, partitionPath,
-                        metaClient.getFs())));
-              } else {
-                // No deltacommits present after this compaction commit (inflight or requested). In this case, we
-                // can also delete any log files that were created with this compaction commit as base
-                // commit.
-                hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete));
-              }
-              break;
-            case HoodieTimeline.DELTA_COMMIT_ACTION:
-              // --------------------------------------------------------------------------------------------------
-              // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
-              // --------------------------------------------------------------------------------------------------
-              // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In
-              // this scenario we would want to delete these log files.
-              // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
-              // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
-              // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
-              // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and
-              // and hence will end up deleting these log files. This is done so there are no orphan log files
-              // lying around.
-              // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
-              // taken in this scenario is a combination of (A.2) and (A.3)
-              // ---------------------------------------------------------------------------------------------------
-              // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
-              // ---------------------------------------------------------------------------------------------------
-              // (B.1) Failed first commit - Inserts were written to base files and HoodieWriteStat has no entries.
-              // In this scenario, we delete all the base files written for the failed commit.
-              // (B.2) Failed recurring commits - Inserts were written to base files and updates to log files. In
-              // this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
-              // (B.3) Rollback triggered for first commit - Same as (B.1)
-              // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
-              // as well if the base base file gets deleted.
-              HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
-                  table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
-                  HoodieCommitMetadata.class);
-
-              // In case all data was inserts and the commit failed, delete the file belonging to that commit
-              // We do not know fileIds for inserts (first inserts are either log files or base files),
-              // delete all files for the corresponding failed commit, if present (same as COW)
-              hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete));
-
-              // append rollback blocks for updates and inserts as A.2 and B.2
-              if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
-                hoodieRollbackRequests.addAll(
-                    getRollbackRequestToAppend(partitionPath, instantToRollback, commitMetadata, table));
-              }
-              break;
-            default:
-              throw new HoodieRollbackException("Unknown listing type, during rollback of " + instantToRollback);
-          }
-        } else {
-          throw new HoodieRollbackException(
-              String.format("Unsupported table type: %s, during listing rollback of %s", tableType, instantToRollback));
-        }
-        return hoodieRollbackRequests.stream();
-      }, numPartitions);
-    } catch (Exception e) {
+      List<ListingBasedRollbackRequest> rollbackRequests = null;
+      if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
+        rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context,
+            table.getMetaClient().getBasePath());
+      } else {
+        rollbackRequests = RollbackUtils
+            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, table, context);
+      }
+      List<HoodieRollbackRequest> listingBasedRollbackRequests = new ListingBasedRollbackHelper(table.getMetaClient(), config)
+          .getRollbackRequestsForRollbackPlan(context, instantToRollback, rollbackRequests);
+      return listingBasedRollbackRequests;
+    } catch (IOException e) {
       LOG.error("Generating rollback requests failed for " + instantToRollback.getTimestamp(), e);
       throw new HoodieRollbackException("Generating rollback requests failed for " + instantToRollback.getTimestamp(), e);
     }
   }
-
-  private String getBaseFileExtension(HoodieTableMetaClient metaClient) {
-    return metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
-  }
-
-  @NotNull
-  private HoodieRollbackRequest getHoodieRollbackRequest(String partitionPath, FileStatus[] filesToDeletedStatus) {
-    List<String> filesToDelete = getFilesToBeDeleted(filesToDeletedStatus);
-    return new HoodieRollbackRequest(
-        partitionPath, EMPTY_STRING, EMPTY_STRING, filesToDelete, Collections.emptyMap());
-  }
-
-  @NotNull
-  private List<String> getFilesToBeDeleted(FileStatus[] dataFilesToDeletedStatus) {
-    return Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
-      String dataFileToBeDeleted = fileStatus.getPath().toString();
-      // strip scheme E.g: file:/var/folders
-      return dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1);
-    }).collect(Collectors.toList());
-  }
-
-  private FileStatus[] listFilesToBeDeleted(String commit, String basefileExtension, String partitionPath,
-                                            FileSystem fs) throws IOException {
-    LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit);
-    PathFilter filter = (path) -> {
-      if (path.toString().contains(basefileExtension)) {
-        String fileCommitTime = FSUtils.getCommitTime(path.getName());
-        return commit.equals(fileCommitTime);
-      }
-      return false;
-    };
-    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
-  }
-
-  private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback, String partitionPath, String basePath,
-                                             String baseFileExtension, HoodieWrapperFileSystem fs,
-                                             Option<HoodieCommitMetadata> commitMetadataOptional,
-                                             Boolean isCommitMetadataCompleted) throws IOException {
-    if (isCommitMetadataCompleted) {
-      return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, basePath, commitMetadataOptional.get(),
-          baseFileExtension, fs);
-    } else {
-      return fetchFilesFromListFiles(instantToRollback, partitionPath, basePath, baseFileExtension, fs);
-    }
-  }
-
-  private FileStatus[] fetchFilesFromCommitMetadata(HoodieInstant instantToRollback, String partitionPath,
-                                                    String basePath, HoodieCommitMetadata commitMetadata,
-                                                    String baseFileExtension, HoodieWrapperFileSystem fs)
-      throws IOException {
-    SerializablePathFilter pathFilter = getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
-    Path[] filePaths = getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath);
-
-    return fs.listStatus(filePaths, pathFilter);
-  }
-
-  private FileStatus[] fetchFilesFromListFiles(HoodieInstant instantToRollback, String partitionPath, String basePath,
-                                               String baseFileExtension, HoodieWrapperFileSystem fs)
-      throws IOException {
-    SerializablePathFilter pathFilter = getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
-    Path[] filePaths = listFilesToBeDeleted(basePath, partitionPath);
-
-    return fs.listStatus(filePaths, pathFilter);
-  }
-
-  private Boolean checkCommitMetadataCompleted(HoodieInstant instantToRollback,
-                                               Option<HoodieCommitMetadata> commitMetadataOptional) {
-    return commitMetadataOptional.isPresent() && instantToRollback.isCompleted()
-        && !WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType());
-  }
-
-  private static Path[] listFilesToBeDeleted(String basePath, String partitionPath) {
-    return new Path[] {FSUtils.getPartitionPath(basePath, partitionPath)};
-  }
-
-  private static Path[] getFilesFromCommitMetadata(String basePath, HoodieCommitMetadata commitMetadata, String partitionPath) {
-    List<String> fullPaths = commitMetadata.getFullPathsByPartitionPath(basePath, partitionPath);
-    return fullPaths.stream().map(Path::new).toArray(Path[]::new);
-  }
-
-  @NotNull
-  private static SerializablePathFilter getSerializablePathFilter(String basefileExtension, String commit) {
-    return (path) -> {
-      if (path.toString().endsWith(basefileExtension)) {
-        String fileCommitTime = FSUtils.getCommitTime(path.getName());
-        return commit.equals(fileCommitTime);
-      } else if (FSUtils.isLogFile(path)) {
-        // Since the baseCommitTime is the only commit for new log files, it's okay here
-        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
-        return commit.equals(fileCommitTime);
-      }
-      return false;
-    };
-  }
-
-  public static List<HoodieRollbackRequest> getRollbackRequestToAppend(String partitionPath, HoodieInstant rollbackInstant,
-                                                                       HoodieCommitMetadata commitMetadata, HoodieTable<?, ?, ?, ?> table) {
-    List<HoodieRollbackRequest> hoodieRollbackRequests =  new ArrayList<>();
-    checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
-
-    // wStat.getPrevCommit() might not give the right commit time in the following
-    // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
-    // used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
-    // But the index (global) might store the baseCommit of the base and not the requested, hence get the
-    // baseCommit always by listing the file slice
-    // With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices()
-    Map<String, FileSlice> latestFileSlices = table.getSliceView()
-        .getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.getTimestamp(), true)
-        .collect(Collectors.toMap(FileSlice::getFileId, Function.identity()));
-
-    List<HoodieWriteStat> hoodieWriteStats = commitMetadata.getPartitionToWriteStats().get(partitionPath)
-        .stream()
-        .filter(writeStat -> {
-          // Filter out stats without prevCommit since they are all inserts
-          boolean validForRollback = (writeStat != null) && (!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
-              && (writeStat.getPrevCommit() != null) && latestFileSlices.containsKey(writeStat.getFileId());
-
-          if (!validForRollback) {
-            return false;
-          }
-
-          FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());
-
-          // For sanity, log-file base-instant time can never be less than base-commit on which we are rolling back
-          checkArgument(
-              HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(),
-                  HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp()),
-              "Log-file base-instant could not be less than the instant being rolled back");
-
-          // Command block "rolling back" the preceding block {@link HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK}
-          // w/in the latest file-slice is appended iff base-instant of the log-file is _strictly_ less
-          // than the instant of the Delta Commit being rolled back. Otherwise, log-file will be cleaned up
-          // in a different branch of the flow.
-          return HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp());
-        })
-        .collect(Collectors.toList());
-
-    for (HoodieWriteStat writeStat : hoodieWriteStats) {
-      FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());
-      String fileId = writeStat.getFileId();
-      String latestBaseInstant = latestFileSlice.getBaseInstantTime();
-
-      Path fullLogFilePath = FSUtils.getPartitionPath(table.getConfig().getBasePath(), writeStat.getPath());
-
-      Map<String, Long> logFilesWithBlocksToRollback =
-          Collections.singletonMap(fullLogFilePath.toString(), writeStat.getTotalWriteBytes());
-
-      hoodieRollbackRequests.add(new HoodieRollbackRequest(partitionPath, fileId, latestBaseInstant,
-          Collections.emptyList(), logFilesWithBlocksToRollback));
-    }
-
-    return hoodieRollbackRequests;
-  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
index ce7a185151..2bc9b59b0d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
@@ -21,13 +21,21 @@ package org.apache.hudi.table.action.rollback;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -36,6 +44,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 
@@ -91,4 +102,160 @@ public class RollbackUtils {
     return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount);
   }
 
+  /**
+   * Generate all rollback requests that needs rolling back this action without actually performing rollback for COW table type.
+   * @param engineContext instance of {@link HoodieEngineContext} to use.
+   * @param basePath base path of interest.
+   * @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected.
+   */
+  public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext, String basePath) {
+    return FSUtils.getAllPartitionPaths(engineContext, basePath, false, false).stream()
+        .map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Generate all rollback requests that we need to perform for rolling back this action without actually performing rolling back for MOR table type.
+   *
+   * @param instantToRollback Instant to Rollback
+   * @param table instance of {@link HoodieTable} to use.
+   * @param context instance of {@link HoodieEngineContext} to use.
+   * @return list of rollback requests
+   */
+  public static List<ListingBasedRollbackRequest> generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, HoodieTable table, HoodieEngineContext context) throws IOException {
+    String commit = instantToRollback.getTimestamp();
+    HoodieWriteConfig config = table.getConfig();
+    List<String> partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), false, false);
+    if (partitions.isEmpty()) {
+      return new ArrayList<>();
+    }
+    int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
+    context.setJobStatus(RollbackUtils.class.getSimpleName(), "Generate all rollback requests");
+    return context.flatMap(partitions, partitionPath -> {
+      HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline();
+      List<ListingBasedRollbackRequest> partitionRollbackRequests = new ArrayList<>();
+      switch (instantToRollback.getAction()) {
+        case HoodieTimeline.COMMIT_ACTION:
+        case HoodieTimeline.REPLACE_COMMIT_ACTION:
+          LOG.info("Rolling back commit action.");
+          partitionRollbackRequests.add(
+              ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
+          break;
+        case HoodieTimeline.COMPACTION_ACTION:
+          // If there is no delta commit present after the current commit (if compaction), no action, else we
+          // need to make sure that a compaction commit rollback also deletes any log files written as part of the
+          // succeeding deltacommit.
+          boolean higherDeltaCommits =
+              !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty();
+          if (higherDeltaCommits) {
+            // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled
+            // and has not yet finished. In this scenario we should delete only the newly created base files
+            // and not corresponding base commit log files created with this as baseCommit since updates would
+            // have been written to the log files.
+            LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files");
+            partitionRollbackRequests.add(
+                ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath));
+          } else {
+            // No deltacommits present after this compaction commit (inflight or requested). In this case, we
+            // can also delete any log files that were created with this compaction commit as base
+            // commit.
+            LOG.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and"
+                + " log files");
+            partitionRollbackRequests.add(
+                ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
+          }
+          break;
+        case HoodieTimeline.DELTA_COMMIT_ACTION:
+          // --------------------------------------------------------------------------------------------------
+          // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
+          // --------------------------------------------------------------------------------------------------
+          // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In
+          // this scenario we would want to delete these log files.
+          // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
+          // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
+          // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
+          // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and
+          // and hence will end up deleting these log files. This is done so there are no orphan log files
+          // lying around.
+          // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
+          // taken in this scenario is a combination of (A.2) and (A.3)
+          // ---------------------------------------------------------------------------------------------------
+          // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
+          // ---------------------------------------------------------------------------------------------------
+          // (B.1) Failed first commit - Inserts were written to base files and HoodieWriteStat has no entries.
+          // In this scenario, we delete all the base files written for the failed commit.
+          // (B.2) Failed recurring commits - Inserts were written to base files and updates to log files. In
+          // this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
+          // (B.3) Rollback triggered for first commit - Same as (B.1)
+          // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
+          // as well if the base base file gets deleted.
+          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+              table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+              HoodieCommitMetadata.class);
+
+          // In case all data was inserts and the commit failed, delete the file belonging to that commit
+          // We do not know fileIds for inserts (first inserts are either log files or base files),
+          // delete all files for the corresponding failed commit, if present (same as COW)
+          partitionRollbackRequests.add(
+              ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
+
+          // append rollback blocks for updates and inserts as A.2 and B.2
+          if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
+            partitionRollbackRequests
+                .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata, table));
+          }
+          break;
+        default:
+          break;
+      }
+      return partitionRollbackRequests.stream();
+    }, Math.min(partitions.size(), sparkPartitions)).stream().filter(Objects::nonNull).collect(Collectors.toList());
+  }
+
+  private static List<ListingBasedRollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant,
+      HoodieCommitMetadata commitMetadata, HoodieTable table) {
+    checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
+
+    // wStat.getPrevCommit() might not give the right commit time in the following
+    // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
+    // used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
+    // But the index (global) might store the baseCommit of the base and not the requested, hence get the
+    // baseCommit always by listing the file slice
+    // With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices()
+    Map<String, FileSlice> latestFileSlices = table.getSliceView()
+        .getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.getTimestamp(), true)
+        .collect(Collectors.toMap(FileSlice::getFileId, Function.identity()));
+
+    return commitMetadata.getPartitionToWriteStats().get(partitionPath)
+        .stream()
+        .filter(writeStat -> {
+          // Filter out stats without prevCommit since they are all inserts
+          boolean validForRollback = (writeStat != null) && (!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
+              && (writeStat.getPrevCommit() != null) && latestFileSlices.containsKey(writeStat.getFileId());
+
+          if (!validForRollback) {
+            return false;
+          }
+
+          FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());
+
+          // For sanity, log-file base-instant time can never be less than base-commit on which we are rolling back
+          checkArgument(
+              HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(),
+                  HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp()),
+              "Log-file base-instant could not be less than the instant being rolled back");
+
+          // Command block "rolling back" the preceding block {@link HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK}
+          // w/in the latest file-slice is appended iff base-instant of the log-file is _strictly_ less
+          // than the instant of the Delta Commit being rolled back. Otherwise, log-file will be cleaned up
+          // in a different branch of the flow.
+          return HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp());
+        })
+        .map(writeStat -> {
+          FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());
+          return ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath,
+              writeStat.getFileId(), latestFileSlice.getBaseInstantTime(), writeStat);
+        })
+        .collect(Collectors.toList());
+  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java
deleted file mode 100644
index e2affdf5ca..0000000000
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hadoop.fs.PathFilter;
-
-import java.io.Serializable;
-
-public interface SerializablePathFilter extends PathFilter, Serializable {
-}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index 42add690f2..6a114154c8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -18,14 +18,14 @@
 
 package org.apache.hudi.table.upgrade;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -35,10 +35,15 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
-import org.apache.hudi.table.action.rollback.ListingBasedRollbackStrategy;
+import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
+import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
+import org.apache.hudi.table.action.rollback.RollbackUtils;
 import org.apache.hudi.table.marker.WriteMarkers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -95,7 +100,14 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler {
         writeMarkers.quietDeleteMarkerDir(context, parallelism);
 
         // generate rollback stats
-        List<HoodieRollbackStat> rollbackStats = getListBasedRollBackStats(table, context, commitInstantOpt);
+        List<ListingBasedRollbackRequest> rollbackRequests;
+        if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
+          rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath());
+        } else {
+          rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context);
+        }
+        List<HoodieRollbackStat> rollbackStats = getListBasedRollBackStats(table.getMetaClient(), table.getConfig(),
+            context, commitInstantOpt, rollbackRequests);
 
         // recreate markers adhering to marker based rollback
         for (HoodieRollbackStat rollbackStat : rollbackStats) {
@@ -114,12 +126,12 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler {
     }
   }
 
-  List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTable<?, ?, ?, ?> table, HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt) {
-    List<HoodieRollbackRequest> hoodieRollbackRequests =
-        new ListingBasedRollbackStrategy(table, context, table.getConfig(), commitInstantOpt.get().getTimestamp())
-            .getRollbackRequests(commitInstantOpt.get());
-    return new BaseRollbackHelper(table.getMetaClient(), table.getConfig())
-        .collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests);
+  List<HoodieRollbackStat> getListBasedRollBackStats(
+      HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context,
+      Option<HoodieInstant> commitInstantOpt, List<ListingBasedRollbackRequest> rollbackRequests) {
+    List<HoodieRollbackRequest> hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config)
+        .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests);
+    return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests);
   }
 
   /**
@@ -131,7 +143,7 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler {
    * @param table       {@link HoodieTable} instance to use
    * @return the marker file name thus curated.
    */
-  private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable<?, ?, ?, ?> table) {
+  private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) {
     Path logPath = new Path(table.getMetaClient().getBasePath(), logFilePath);
     String fileId = FSUtils.getFileIdFromLogPath(logPath);
     String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index d8ce6612a4..c9e3fed871 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -45,6 +45,7 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -124,8 +125,8 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
 
     for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry : rollbackMetadata.entrySet()) {
       HoodieRollbackPartitionMetadata meta = entry.getValue();
-      assertEquals(0, meta.getFailedDeleteFiles().size());
-      assertEquals(0, meta.getSuccessDeleteFiles().size());
+      assertTrue(meta.getFailedDeleteFiles() == null || meta.getFailedDeleteFiles().size() == 0);
+      assertTrue(meta.getSuccessDeleteFiles() == null || meta.getSuccessDeleteFiles().size() == 0);
     }
 
     //4. assert file group after rollback, and compare to the rollbackstat
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 53ceb00409..40bc3c8280 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -138,19 +138,6 @@ public class HoodieCommitMetadata implements Serializable {
     return fullPaths;
   }
 
-  public List<String> getFullPathsByPartitionPath(String basePath, String partitionPath) {
-    HashSet<String> fullPaths = new HashSet<>();
-    if (getPartitionToWriteStats().get(partitionPath) != null) {
-      for (HoodieWriteStat stat : getPartitionToWriteStats().get(partitionPath)) {
-        if ((stat.getFileId() != null)) {
-          String fullPath = FSUtils.getPartitionPath(basePath, stat.getPath()).toString();
-          fullPaths.add(fullPath);
-        }
-      }
-    }
-    return new ArrayList<>(fullPaths);
-  }
-
   public Map<HoodieFileGroupId, String> getFileGroupIdAndFullPaths(String basePath) {
     Map<HoodieFileGroupId, String> fileGroupIdToFullPaths = new HashMap<>();
     for (Map.Entry<String, List<HoodieWriteStat>> entry : getPartitionToWriteStats().entrySet()) {