You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/08/11 08:43:57 UTC

[hudi] branch master updated: [HUDI-808] Support cleaning bootstrap source data (#1870)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b928e9  [HUDI-808] Support cleaning bootstrap source data (#1870)
8b928e9 is described below

commit 8b928e9bca5ecac8f956b17e5d18f2deaf526253
Author: wenningd <we...@gmail.com>
AuthorDate: Tue Aug 11 01:43:46 2020 -0700

    [HUDI-808] Support cleaning bootstrap source data (#1870)
    
    Co-authored-by: Wenning Ding <we...@amazon.com>
    Co-authored-by: Balaji Varadarajan <vb...@apache.org>
---
 .../apache/hudi/config/HoodieCompactionConfig.java |  10 +
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +
 .../table/action/clean/CleanActionExecutor.java    |  58 ++--
 .../hudi/table/action/clean/CleanPlanner.java      |  39 ++-
 .../table/action/clean/PartitionCleanStat.java     |  42 ++-
 .../java/org/apache/hudi/table/TestCleaner.java    | 352 +++++++++++++++------
 hudi-common/pom.xml                                |   1 +
 hudi-common/src/main/avro/HoodieCleanMetadata.avsc |  23 +-
 .../HoodieCleanPartitionMetadata.avsc}             |  30 +-
 hudi-common/src/main/avro/HoodieCleanerPlan.avsc   |  28 ++
 .../org/apache/hudi/common/HoodieCleanStat.java    |  57 +++-
 .../CleanFileInfo.java}                            |  34 +-
 .../versioning/clean/CleanMetadataMigrator.java    |   4 +-
 ...r.java => CleanMetadataV1MigrationHandler.java} |   4 +-
 ...r.java => CleanMetadataV2MigrationHandler.java} |   4 +-
 ...etadataMigrator.java => CleanPlanMigrator.java} |  13 +-
 .../clean/CleanPlanV1MigrationHandler.java         |  66 ++++
 .../clean/CleanPlanV2MigrationHandler.java         |  64 ++++
 .../IncrementalTimelineSyncFileSystemView.java     |   3 +-
 .../org/apache/hudi/common/util/CleanerUtils.java  |  42 ++-
 .../hudi/common/bootstrap/TestBootstrapIndex.java  |  22 +-
 .../hudi/common/testutils/HoodieTestUtils.java     |   4 +-
 .../java/org/apache/hudi/client/TestBootstrap.java |   2 +-
 23 files changed, 700 insertions(+), 206 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 1b993b1..08f3774 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -52,6 +52,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
   public static final String MAX_COMMITS_TO_KEEP_PROP = "hoodie.keep.max.commits";
   public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits";
   public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = "hoodie.commits.archival.batch";
+  // Set true to clean bootstrap source files when necessary
+  public static final String CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = "hoodie.cleaner.delete.bootstrap.base.file";
   // Upsert uses this file size to compact new data onto existing files..
   public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit";
   // By default, treat any file <= 100MB as a small file.
@@ -112,6 +114,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
   private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30";
   private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20";
   private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = String.valueOf(10);
+  private static final String DEFAULT_CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = "false";
   public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP =
       "hoodie.compaction.daybased.target.partitions";
   // 500GB of target IO per compaction (both read and write)
@@ -252,6 +255,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withCleanBootstrapBaseFileEnabled(Boolean cleanBootstrapSourceFileEnabled) {
+      props.setProperty(CLEANER_BOOTSTRAP_BASE_FILE_ENABLED, String.valueOf(cleanBootstrapSourceFileEnabled));
+      return this;
+    }
+
     public HoodieCompactionConfig build() {
       HoodieCompactionConfig config = new HoodieCompactionConfig(props);
       setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN);
@@ -298,6 +306,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
           TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION);
       setDefaultOnCondition(props, !props.containsKey(COMMITS_ARCHIVAL_BATCH_SIZE_PROP),
           COMMITS_ARCHIVAL_BATCH_SIZE_PROP, DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE);
+      setDefaultOnCondition(props, !props.containsKey(CLEANER_BOOTSTRAP_BASE_FILE_ENABLED),
+          CLEANER_BOOTSTRAP_BASE_FILE_ENABLED, DEFAULT_CLEANER_BOOTSTRAP_BASE_FILE_ENABLED);
 
       HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP));
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 89efc4e..12026a9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -369,6 +369,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Integer.parseInt(props.getProperty(HoodieCompactionConfig.COMMITS_ARCHIVAL_BATCH_SIZE_PROP));
   }
 
+  public Boolean shouldCleanBootstrapBaseFile() {
+    return Boolean.valueOf(props.getProperty(HoodieCompactionConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLED));
+  }
+
   /**
    * index properties.
    */
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index c72a453..5261447 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -23,13 +23,15 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieActionInstant;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
 import org.apache.hudi.common.HoodieCleanStat;
-import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CleanFileInfo;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -82,40 +84,45 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
       LOG.info("Using cleanerParallelism: " + cleanerParallelism);
 
       jsc.setJobGroup(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned");
-      Map<String, List<String>> cleanOps = jsc
+      Map<String, List<HoodieCleanFileInfo>> cleanOps = jsc
           .parallelize(partitionsToClean, cleanerParallelism)
           .map(partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)))
           .collect().stream()
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+          .collect(Collectors.toMap(Pair::getKey, y -> CleanerUtils.convertToHoodieCleanFileInfoList(y.getValue())));
 
       return new HoodieCleanerPlan(earliestInstant
           .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
-          config.getCleanerPolicy().name(), cleanOps, 1);
+          config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),
+          CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps);
     } catch (IOException e) {
       throw new HoodieIOException("Failed to schedule clean operation", e);
     }
   }
 
-  private static PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat> deleteFilesFunc(
-      HoodieTable table) {
-    return (PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat>) iter -> {
+  private static PairFlatMapFunction<Iterator<Tuple2<String, CleanFileInfo>>, String, PartitionCleanStat>
+        deleteFilesFunc(HoodieTable table) {
+    return (PairFlatMapFunction<Iterator<Tuple2<String, CleanFileInfo>>, String, PartitionCleanStat>) iter -> {
       Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
-
       FileSystem fs = table.getMetaClient().getFs();
-      Path basePath = new Path(table.getMetaClient().getBasePath());
       while (iter.hasNext()) {
-        Tuple2<String, String> partitionDelFileTuple = iter.next();
+        Tuple2<String, CleanFileInfo> partitionDelFileTuple = iter.next();
         String partitionPath = partitionDelFileTuple._1();
-        String delFileName = partitionDelFileTuple._2();
-        Path deletePath = FSUtils.getPartitionPath(FSUtils.getPartitionPath(basePath, partitionPath), delFileName);
+        Path deletePath = new Path(partitionDelFileTuple._2().getFilePath());
         String deletePathStr = deletePath.toString();
         Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
         if (!partitionCleanStatMap.containsKey(partitionPath)) {
           partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
         }
+        boolean isBootstrapBasePathFile = partitionDelFileTuple._2().isBootstrapBaseFile();
         PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);
-        partitionCleanStat.addDeleteFilePatterns(deletePath.getName());
-        partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult);
+        if (isBootstrapBasePathFile) {
+          // For Bootstrap Base file deletions, store the full file path.
+          partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
+          partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true);
+        } else {
+          partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
+          partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false);
+        }
       }
       return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue()))
           .collect(Collectors.toList()).iterator();
@@ -145,14 +152,15 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
    */
   List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieCleanerPlan cleanerPlan) {
     int cleanerParallelism = Math.min(
-        (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
+        (int) (cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
         config.getCleanerParallelism());
     LOG.info("Using cleanerParallelism: " + cleanerParallelism);
-    
+
     jsc.setJobGroup(this.getClass().getSimpleName(), "Perform cleaning of partitions");
     List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
-        .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
-            .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y)))
+        .parallelize(cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
+            .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(),
+              new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile()))))
             .collect(Collectors.toList()), cleanerParallelism)
         .mapPartitionsToPair(deleteFilesFunc(table))
         .reduceByKey(PartitionCleanStat::merge).collect();
@@ -161,7 +169,7 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
         .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
 
     // Return PartitionCleanStat for each partition passed.
-    return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
+    return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
       PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath)
           ? partitionCleanStatsMap.get(partitionPath)
           : new PartitionCleanStat(partitionPath);
@@ -175,21 +183,25 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
           .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
           .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
           .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
+          .withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
+          .withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
+          .withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
           .build();
     }).collect(Collectors.toList());
   }
 
   /**
    * Creates a Cleaner plan if there are files to be cleaned and stores them in instant file.
+   * Cleaner Plan contains absolute file paths.
    *
    * @param startCleanTime Cleaner Instant Time
    * @return Cleaner Plan if generated
    */
   Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
     final HoodieCleanerPlan cleanerPlan = requestClean(jsc);
-    if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
-        && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()
-        && cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {
+    if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null)
+        && !cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()
+        && cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {
       // Only create cleaner plan which does some work
       final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
       // Save to both aux and timeline folder
@@ -275,7 +287,7 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
     if (cleanerPlanOpt.isPresent()) {
       table.getMetaClient().reloadActiveTimeline();
       HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
-      if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
+      if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null) && !cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty()) {
         return runClean(table, HoodieTimeline.getCleanRequestedInstant(instantTime), cleanerPlan);
       }
     }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 815b41d..dc89126 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.clean;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CleanFileInfo;
 import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -28,12 +29,13 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieFileGroupId;
-import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler;
+import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -65,6 +67,10 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
 
   private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
 
+  public static final Integer CLEAN_PLAN_VERSION_1 = CleanPlanV1MigrationHandler.VERSION;
+  public static final Integer CLEAN_PLAN_VERSION_2 = CleanPlanV2MigrationHandler.VERSION;
+  public static final Integer LATEST_CLEAN_PLAN_VERSION = CLEAN_PLAN_VERSION_2;
+
   private final SyncableFileSystemView fileSystemView;
   private final HoodieTimeline commitTimeline;
   private final Map<HoodieFileGroupId, CompactionOperation> fgIdToPendingCompactionOperations;
@@ -189,11 +195,11 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
    * policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a
    * single file (i.e run it with versionsRetained = 1)
    */
-  private List<String> getFilesToCleanKeepingLatestVersions(String partitionPath) {
+  private List<CleanFileInfo> getFilesToCleanKeepingLatestVersions(String partitionPath) {
     LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
         + " file versions. ");
     List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
-    List<String> deletePaths = new ArrayList<>();
+    List<CleanFileInfo> deletePaths = new ArrayList<>();
     // Collect all the datafiles savepointed by all the savepoints
     List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
         .flatMap(this::getSavepointedDataFiles)
@@ -224,11 +230,15 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
         FileSlice nextSlice = fileSliceIterator.next();
         if (nextSlice.getBaseFile().isPresent()) {
           HoodieBaseFile dataFile = nextSlice.getBaseFile().get();
-          deletePaths.add(dataFile.getFileName());
+          deletePaths.add(new CleanFileInfo(dataFile.getPath(), false));
+          if (dataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
+            deletePaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
+          }
         }
         if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
           // If merge on read, then clean the log files for the commits as well
-          deletePaths.addAll(nextSlice.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()));
+          deletePaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
+              .collect(Collectors.toList()));
         }
       }
     }
@@ -249,10 +259,10 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
    * <p>
    * This policy is the default.
    */
-  private List<String> getFilesToCleanKeepingLatestCommits(String partitionPath) {
+  private List<CleanFileInfo> getFilesToCleanKeepingLatestCommits(String partitionPath) {
     int commitsRetained = config.getCleanerCommitsRetained();
     LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
-    List<String> deletePaths = new ArrayList<>();
+    List<CleanFileInfo> deletePaths = new ArrayList<>();
 
     // Collect all the datafiles savepointed by all the savepoints
     List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
@@ -297,16 +307,21 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
           if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
               .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
             // this is a commit, that should be cleaned.
-            aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName()));
+            aFile.ifPresent(hoodieDataFile -> {
+              deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
+              if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
+                deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
+              }
+            });
             if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
               // If merge on read, then clean the log files for the commits as well
-              deletePaths.addAll(aSlice.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()));
+              deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
+                  .collect(Collectors.toList()));
             }
           }
         }
       }
     }
-
     return deletePaths;
   }
 
@@ -329,9 +344,9 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
   /**
    * Returns files to be cleaned for the given partitionPath based on cleaning policy.
    */
-  public List<String> getDeletePaths(String partitionPath) {
+  public List<CleanFileInfo> getDeletePaths(String partitionPath) {
     HoodieCleaningPolicy policy = config.getCleanerPolicy();
-    List<String> deletePaths;
+    List<CleanFileInfo> deletePaths;
     if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
       deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
     } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/PartitionCleanStat.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/PartitionCleanStat.java
index 3493ad6..2ff2d96 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/PartitionCleanStat.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/PartitionCleanStat.java
@@ -28,21 +28,36 @@ class PartitionCleanStat implements Serializable {
   private final List<String> deletePathPatterns = new ArrayList<>();
   private final List<String> successDeleteFiles = new ArrayList<>();
   private final List<String> failedDeleteFiles = new ArrayList<>();
+  private final List<String> deleteBootstrapBasePathPatterns = new ArrayList<>();
+  private final List<String> successfulDeleteBootstrapBaseFiles = new ArrayList<>();
+  private final List<String> failedDeleteBootstrapBaseFiles = new ArrayList<>();
 
   PartitionCleanStat(String partitionPath) {
     this.partitionPath = partitionPath;
   }
 
-  void addDeletedFileResult(String deletePathStr, Boolean deletedFileResult) {
-    if (deletedFileResult) {
-      successDeleteFiles.add(deletePathStr);
+  void addDeletedFileResult(String deletePathStr, boolean success, boolean isBootstrapBasePath) {
+    if (success) {
+      if (isBootstrapBasePath) {
+        successfulDeleteBootstrapBaseFiles.add(deletePathStr);
+      } else {
+        successDeleteFiles.add(deletePathStr);
+      }
     } else {
-      failedDeleteFiles.add(deletePathStr);
+      if (isBootstrapBasePath) {
+        failedDeleteBootstrapBaseFiles.add(deletePathStr);
+      } else {
+        failedDeleteFiles.add(deletePathStr);
+      }
     }
   }
 
-  void addDeleteFilePatterns(String deletePathStr) {
-    deletePathPatterns.add(deletePathStr);
+  void addDeleteFilePatterns(String deletePathStr, boolean isBootstrapBasePath) {
+    if (isBootstrapBasePath) {
+      deleteBootstrapBasePathPatterns.add(deletePathStr);
+    } else {
+      deletePathPatterns.add(deletePathStr);
+    }
   }
 
   PartitionCleanStat merge(PartitionCleanStat other) {
@@ -53,6 +68,9 @@ class PartitionCleanStat implements Serializable {
     successDeleteFiles.addAll(other.successDeleteFiles);
     deletePathPatterns.addAll(other.deletePathPatterns);
     failedDeleteFiles.addAll(other.failedDeleteFiles);
+    deleteBootstrapBasePathPatterns.addAll(other.deleteBootstrapBasePathPatterns);
+    successfulDeleteBootstrapBaseFiles.addAll(other.successfulDeleteBootstrapBaseFiles);
+    failedDeleteBootstrapBaseFiles.addAll(other.failedDeleteBootstrapBaseFiles);
     return this;
   }
 
@@ -67,4 +85,16 @@ class PartitionCleanStat implements Serializable {
   public List<String> failedDeleteFiles() {
     return failedDeleteFiles;
   }
+
+  public List<String> getDeleteBootstrapBasePathPatterns() {
+    return deleteBootstrapBasePathPatterns;
+  }
+
+  public List<String> getSuccessfulDeleteBootstrapBaseFiles() {
+    return successfulDeleteBootstrapBaseFiles;
+  }
+
+  public List<String> getFailedDeleteBootstrapBaseFiles() {
+    return failedDeleteBootstrapBaseFiles;
+  }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 0376ec3..a064788 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -18,14 +18,19 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.avro.model.HoodieActionInstant;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.bootstrap.TestBootstrapIndex;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BootstrapFileMapping;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
@@ -42,6 +47,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator;
+import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator;
+import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.testutils.FileSystemTestUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -55,6 +62,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.action.clean.CleanPlanner;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
 
@@ -64,6 +72,7 @@ import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.Test;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
@@ -76,6 +85,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -467,7 +477,7 @@ public class TestCleaner extends HoodieClientTestBase {
       });
     }
 
-    return cleanMetadata1.getPartitionMetadata().values().stream()
+    Map<String, HoodieCleanStat> cleanStatMap = cleanMetadata1.getPartitionMetadata().values().stream()
         .map(x -> new HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath())
             .withFailedDeletes(x.getFailedDeleteFiles()).withSuccessfulDeletes(x.getSuccessDeleteFiles())
             .withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(x.getDeletePathPatterns())
@@ -475,88 +485,144 @@ public class TestCleaner extends HoodieClientTestBase {
                 ? new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "000")
                 : null))
             .build())
-        .collect(Collectors.toList());
+        .collect(Collectors.toMap(HoodieCleanStat::getPartitionPath, x -> x));
+    cleanMetadata1.getBootstrapPartitionMetadata().values().forEach(x -> {
+      HoodieCleanStat s = cleanStatMap.get(x.getPartitionPath());
+      cleanStatMap.put(x.getPartitionPath(), new HoodieCleanStat.Builder().withPartitionPath(x.getPartitionPath())
+          .withFailedDeletes(s.getFailedDeleteFiles()).withSuccessfulDeletes(s.getSuccessDeleteFiles())
+          .withPolicy(HoodieCleaningPolicy.valueOf(x.getPolicy())).withDeletePathPattern(s.getDeletePathPatterns())
+          .withEarliestCommitRetained(Option.ofNullable(s.getEarliestCommitToRetain())
+              .map(y -> new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, y)))
+          .withSuccessfulDeleteBootstrapBaseFiles(x.getSuccessDeleteFiles())
+          .withFailedDeleteBootstrapBaseFiles(x.getFailedDeleteFiles())
+          .withDeleteBootstrapBasePathPatterns(x.getDeletePathPatterns()).build());
+    });
+    return new ArrayList<>(cleanStatMap.values());
   }
 
   /**
-   * Test HoodieTable.clean() Cleaning by versions logic.
+   * Test HoodieTable.clean() Cleaning by versions for COW table.
    */
   @Test
   public void testKeepLatestFileVersions() throws IOException {
+    testKeepLatestFileVersions(false);
+  }
+
+  /**
+   * Test HoodieTable.clean() Cleaning by version logic for COW table with Bootstrap source file clean enable.
+   */
+  @Test
+  public void testBootstrapSourceFileCleanWithKeepLatestFileVersions() throws IOException {
+    testKeepLatestFileVersions(true);
+  }
+
+  /**
+   * Test HoodieTable.clean() Cleaning by versions logic.
+   */
+  public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws IOException {
     HoodieWriteConfig config =
         HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
             .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
                 .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
             .build();
 
     // make 1 commit, with 1 file per partition
-    HoodieTestUtils.createCommitFiles(basePath, "000");
+    HoodieTestUtils.createCommitFiles(basePath, "00000000000001");
 
-    String file1P0C0 =
-        HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
-    String file1P1C0 =
-        HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
-    metaClient = HoodieTableMetaClient.reload(metaClient);
+    Map<String, List<BootstrapFileMapping>> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData() : null;
+
+    String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getFileId()
+        : UUID.randomUUID().toString();
+    String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getFileId()
+        : UUID.randomUUID().toString();
+    HoodieTestUtils
+        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0); // insert
+    HoodieTestUtils
+        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", file1P1C0); // insert
 
     List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
     assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files");
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
         file1P0C0));
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001",
         file1P1C0));
 
     // make next commit, with 1 insert & 1 update per partition
-    HoodieTestUtils.createCommitFiles(basePath, "001");
+    HoodieTestUtils.createCommitFiles(basePath, "00000000000002");
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     String file2P0C1 =
-        HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
+        HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002"); // insert
     String file2P1C1 =
-        HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert
-    HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update
-    HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update
+        HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002"); // insert
+    HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file1P0C0); // update
+    HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002", file1P1C0); // update
 
     List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
-    assertEquals(1,
-        getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
-            .size(), "Must clean 1 file");
-    assertEquals(1,
-        getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
-            .size(), "Must clean 1 file");
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+    // enableBootstrapSourceClean would delete the bootstrap base file as the same time
+    HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+    assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size()
+        + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
+        : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
+    if (enableBootstrapSourceClean) {
+      HoodieFileStatus fstatus =
+          bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus();
+      // This ensures full path is recorded in metadata.
+      assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
+          "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
+              + " but did not contain " + fstatus.getPath().getUri());
+      assertFalse(new File(bootstrapMapping.get(
+          HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists());
+    }
+    cleanStat = getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH);
+    assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size()
+        + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
+        : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
         file2P0C1));
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002",
         file2P1C1));
-    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
+    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
         file1P0C0));
+    if (enableBootstrapSourceClean) {
+      HoodieFileStatus fstatus =
+          bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getBoostrapFileStatus();
+      // This ensures full path is recorded in metadata.
+      assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
+          "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
+              + " but did not contain " + fstatus.getPath().getUri());
+      assertFalse(new File(bootstrapMapping.get(
+          HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists());
+    }
     assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
-        "000", file1P1C0));
+        "00000000000001", file1P1C0));
 
     // make next commit, with 2 updates to existing files, and 1 insert
-    HoodieTestUtils.createCommitFiles(basePath, "002");
+    HoodieTestUtils.createCommitFiles(basePath, "00000000000003");
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
-    HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
-    HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update
+    HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file1P0C0); // update
+    HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file2P0C1); // update
     String file3P0C2 =
-        HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002");
+        HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003");
 
     List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
     assertEquals(2,
         getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
             .getSuccessDeleteFiles().size(), "Must clean two files");
-    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
         file1P0C0));
-    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
         file2P0C1));
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
         file3P0C2));
 
     // No cleaning on partially written file, with no commit.
-    HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // update
+    HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file3P0C2); // update
     List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
     assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
         file3P0C2));
   }
 
@@ -604,7 +670,7 @@ public class TestCleaner extends HoodieClientTestBase {
   }
 
   @Test
-  public void testUpgradeDowngrade() {
+  public void testCleanMetadataUpgradeDowngrade() {
     String instantTime = "000";
 
     String partition1 = DEFAULT_PARTITION_PATHS[0];
@@ -693,6 +759,68 @@ public class TestCleaner extends HoodieClientTestBase {
     assertEquals(policies1, policies2);
   }
 
+  @Test
+  public void testCleanPlanUpgradeDowngrade() {
+    String instantTime = "000";
+
+    String partition1 = DEFAULT_PARTITION_PATHS[0];
+    String partition2 = DEFAULT_PARTITION_PATHS[1];
+
+    String fileName1 = "data1_1_000.parquet";
+    String fileName2 = "data2_1_000.parquet";
+
+    Map<String, List<String>> filesToBeCleanedPerPartition = new HashMap<>();
+    filesToBeCleanedPerPartition.put(partition1, Arrays.asList(fileName1));
+    filesToBeCleanedPerPartition.put(partition2, Arrays.asList(fileName2));
+
+    HoodieCleanerPlan version1Plan =
+        HoodieCleanerPlan.newBuilder().setEarliestInstantToRetain(HoodieActionInstant.newBuilder()
+            .setAction(HoodieTimeline.COMMIT_ACTION)
+            .setTimestamp(instantTime).setState(State.COMPLETED.name()).build())
+            .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
+            .setFilesToBeDeletedPerPartition(filesToBeCleanedPerPartition)
+            .setVersion(CleanPlanV1MigrationHandler.VERSION)
+            .build();
+
+    // Upgrade and Verify version 2 plan
+    HoodieCleanerPlan version2Plan =
+        new CleanPlanMigrator(metaClient).upgradeToLatest(version1Plan, version1Plan.getVersion());
+    assertEquals(version1Plan.getEarliestInstantToRetain(), version2Plan.getEarliestInstantToRetain());
+    assertEquals(version1Plan.getPolicy(), version2Plan.getPolicy());
+    assertEquals(CleanPlanner.LATEST_CLEAN_PLAN_VERSION, version2Plan.getVersion());
+    // Deprecated Field is not used.
+    assertEquals(0, version2Plan.getFilesToBeDeletedPerPartition().size());
+    assertEquals(version1Plan.getFilesToBeDeletedPerPartition().size(),
+        version2Plan.getFilePathsToBeDeletedPerPartition().size());
+    assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition1).size(),
+        version2Plan.getFilePathsToBeDeletedPerPartition().get(partition1).size());
+    assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition2).size(),
+        version2Plan.getFilePathsToBeDeletedPerPartition().get(partition2).size());
+    assertEquals(new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), partition1), fileName1).toString(),
+        version2Plan.getFilePathsToBeDeletedPerPartition().get(partition1).get(0).getFilePath());
+    assertEquals(new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), partition2), fileName2).toString(),
+        version2Plan.getFilePathsToBeDeletedPerPartition().get(partition2).get(0).getFilePath());
+
+    // Downgrade and verify version 1 plan
+    HoodieCleanerPlan gotVersion1Plan = new CleanPlanMigrator(metaClient).migrateToVersion(version2Plan,
+        version2Plan.getVersion(), version1Plan.getVersion());
+    assertEquals(version1Plan.getEarliestInstantToRetain(), gotVersion1Plan.getEarliestInstantToRetain());
+    assertEquals(version1Plan.getPolicy(), version2Plan.getPolicy());
+    assertEquals(version1Plan.getVersion(), gotVersion1Plan.getVersion());
+    assertEquals(version1Plan.getFilesToBeDeletedPerPartition().size(),
+        gotVersion1Plan.getFilesToBeDeletedPerPartition().size());
+    assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition1).size(),
+        gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition1).size());
+    assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition2).size(),
+        gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition2).size());
+    assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition1).get(0),
+        gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition1).get(0));
+    assertEquals(version1Plan.getFilesToBeDeletedPerPartition().get(partition2).get(0),
+        gotVersion1Plan.getFilesToBeDeletedPerPartition().get(partition2).get(0));
+    assertTrue(gotVersion1Plan.getFilePathsToBeDeletedPerPartition().isEmpty());
+    assertNull(version1Plan.getFilePathsToBeDeletedPerPartition());
+  }
+
   private void testCleanMetadataPathEquality(HoodieCleanMetadata metadata, Map<String, Tuple3> expected) {
 
     Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = metadata.getPartitionMetadata();
@@ -708,47 +836,62 @@ public class TestCleaner extends HoodieClientTestBase {
   }
 
   /**
-   * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files.
+   * Test HoodieTable.clean() Cleaning by commit logic for COW table.
    */
   @Test
   public void testKeepLatestCommits() throws IOException {
-    testKeepLatestCommits(false, false);
+    testKeepLatestCommits(false, false, false);
   }
 
   /**
-   * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated
+   * Test HoodieTable.clean() Cleaning by commit logic for COW table. Here the operations are simulated
    * such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds.
    */
   @Test
   public void testKeepLatestCommitsWithFailureRetry() throws IOException {
-    testKeepLatestCommits(true, false);
+    testKeepLatestCommits(true, false, false);
   }
 
   /**
-   * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files.
+   * Test HoodieTable.clean() Cleaning by commit logic for COW table.
    */
   @Test
   public void testKeepLatestCommitsIncrMode() throws IOException {
-    testKeepLatestCommits(false, true);
+    testKeepLatestCommits(false, true, false);
+  }
+
+  /**
+   * Test HoodieTable.clean() Cleaning by commit logic for COW table with Bootstrap source file clean enable.
+   */
+  @Test
+  public void testBootstrapSourceFileCleanWithKeepLatestCommits() throws IOException {
+    testKeepLatestCommits(false, false, true);
   }
 
   /**
-   * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files.
+   * Test HoodieTable.clean() Cleaning by commit logic for COW table.
    */
-  private void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean) throws IOException {
+  private void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws IOException {
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withIncrementalCleaningMode(enableIncrementalClean)
+            .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
         .build();
 
+    Map<String, List<BootstrapFileMapping>> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData() : null;
+
     // make 1 commit, with 1 file per partition
-    HoodieTestUtils.createInflightCommitFiles(basePath, "000");
+    HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000001");
 
-    String file1P0C0 =
-        HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
-    String file1P1C0 =
-        HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
+    String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getFileId()
+        : UUID.randomUUID().toString();
+    String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getFileId()
+        : UUID.randomUUID().toString();
+    HoodieTestUtils
+        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0); // insert
+    HoodieTestUtils
+        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", file1P1C0); // insert
 
     HoodieCommitMetadata commitMetadata = generateCommitMetadata(
         Collections.unmodifiableMap(new HashMap<String, List<String>>() {
@@ -759,32 +902,32 @@ public class TestCleaner extends HoodieClientTestBase {
         })
     );
     metaClient.getActiveTimeline().saveAsComplete(
-        new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"),
+        new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"),
         Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
 
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry);
     assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files");
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
         file1P0C0));
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001",
         file1P1C0));
 
     // make next commit, with 1 insert & 1 update per partition
-    HoodieTestUtils.createInflightCommitFiles(basePath, "001");
+    HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000002");
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     String file2P0C1 =
         HoodieTestUtils
-            .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
+            .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002"); // insert
     String file2P1C1 =
         HoodieTestUtils
-            .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert
+            .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002"); // insert
     HoodieTestUtils
-        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update
+        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file1P0C0); // update
     HoodieTestUtils
-        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update
+        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002", file1P1C0); // update
     commitMetadata = generateCommitMetadata(new HashMap<String, List<String>>() {
       {
         put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
@@ -792,100 +935,133 @@ public class TestCleaner extends HoodieClientTestBase {
       }
     });
     metaClient.getActiveTimeline().saveAsComplete(
-        new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"),
+        new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000002"),
         Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
     List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry);
     assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files");
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
         file2P0C1));
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002",
         file2P1C1));
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
         file1P0C0));
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001",
         file1P1C0));
 
     // make next commit, with 2 updates to existing files, and 1 insert
-    HoodieTestUtils.createInflightCommitFiles(basePath, "002");
+    HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000003");
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     HoodieTestUtils
-        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
+        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file1P0C0); // update
     HoodieTestUtils
-        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update
+        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file2P0C1); // update
     String file3P0C2 =
-        HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002");
+        HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003");
 
     commitMetadata = generateCommitMetadata(CollectionUtils
         .createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
             CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2)));
     metaClient.getActiveTimeline().saveAsComplete(
-        new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "002"),
+        new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000003"),
         Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
 
     List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry);
     assertEquals(0, hoodieCleanStatsThree.size(),
         "Must not clean any file. We have to keep 1 version before the latest commit time to keep");
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
         file1P0C0));
 
     // make next commit, with 2 updates to existing files, and 1 insert
-    HoodieTestUtils.createInflightCommitFiles(basePath, "003");
+    HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000004");
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     HoodieTestUtils
-        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); // update
+        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file1P0C0); // update
     HoodieTestUtils
-        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file2P0C1); // update
+        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file2P0C1); // update
     String file4P0C3 =
-        HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003");
+        HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004");
     commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(
         HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3)));
     metaClient.getActiveTimeline().saveAsComplete(
-        new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "003"),
+        new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000004"),
         Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
 
     List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry);
-    assertEquals(1,
-        getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
-            .size(), "Must not clean one old file");
+    // enableBootstrapSourceClean would delete the bootstrap base file as the same time
+    HoodieCleanStat partitionCleanStat =
+        getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
 
-    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
+    assertEquals(enableBootstrapSourceClean ? 2 : 1, partitionCleanStat.getSuccessDeleteFiles().size()
+        + (partitionCleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
+        : partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least one old file");
+
+    assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
         file1P0C0));
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+    if (enableBootstrapSourceClean) {
+      assertFalse(new File(bootstrapMapping.get(
+          HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists());
+    }
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
         file1P0C0));
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
         file1P0C0));
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
         file2P0C1));
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
         file2P0C1));
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
         file3P0C2));
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004",
         file4P0C3));
 
     // No cleaning on partially written file, with no commit.
     HoodieTestUtils
-        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // update
+        .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000005", file3P0C2); // update
     commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
         CollectionUtils.createImmutableList(file3P0C2)));
     metaClient.getActiveTimeline().createNewInstant(
-        new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004"));
+        new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000005"));
     metaClient.getActiveTimeline().transitionRequestedToInflight(
-        new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004"),
+        new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000005"),
         Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
     List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry);
     HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
     assertEquals(0,
         cleanStat != null ? cleanStat.getSuccessDeleteFiles().size() : 0, "Must not clean any files");
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
         file1P0C0));
-    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
+    assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
         file2P0C1));
   }
 
   /**
+   * Generate Bootstrap index, bootstrap base file and corresponding metaClient.
+   * @return Partition to BootstrapFileMapping Map
+   * @throws IOException
+   */
+  private Map<String, List<BootstrapFileMapping>> generateBootstrapIndexAndSourceData() throws IOException {
+    // create bootstrap source data path
+    java.nio.file.Path sourcePath = tempDir.resolve("data");
+    java.nio.file.Files.createDirectories(sourcePath);
+    assertTrue(new File(sourcePath.toString()).exists());
+
+    // recreate metaClient with Bootstrap base path
+    metaClient = HoodieTestUtils.init(basePath, getTableType(), sourcePath.toString());
+
+    // generate bootstrap index
+    Map<String, List<BootstrapFileMapping>> bootstrapMapping = TestBootstrapIndex.generateBootstrapIndex(metaClient, sourcePath.toString(),
+        new String[] {HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH}, 1);
+
+    for (Map.Entry<String, List<BootstrapFileMapping>> entry : bootstrapMapping.entrySet()) {
+      new File(sourcePath.toString() + "/" + entry.getKey()).mkdirs();
+      assertTrue(new File(entry.getValue().get(0).getBoostrapFileStatus().getPath().getUri()).createNewFile());
+    }
+    return bootstrapMapping;
+  }
+
+  /**
    * Test Cleaning functionality of table.rollback() API.
    */
   @Test
diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index d9901da..857f334 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -70,6 +70,7 @@
             <import>${basedir}/src/main/avro/HoodieCompactionOperation.avsc</import>
             <import>${basedir}/src/main/avro/HoodieSavePointMetadata.avsc</import>
             <import>${basedir}/src/main/avro/HoodieCompactionMetadata.avsc</import>
+            <import>${basedir}/src/main/avro/HoodieCleanPartitionMetadata.avsc</import>
             <import>${basedir}/src/main/avro/HoodieCleanMetadata.avsc</import>
             <import>${basedir}/src/main/avro/HoodieCleanerPlan.avsc</import>
             <import>${basedir}/src/main/avro/HoodieRollbackMetadata.avsc</import>
diff --git a/hudi-common/src/main/avro/HoodieCleanMetadata.avsc b/hudi-common/src/main/avro/HoodieCleanMetadata.avsc
index f6c05c5..c26b5a6 100644
--- a/hudi-common/src/main/avro/HoodieCleanMetadata.avsc
+++ b/hudi-common/src/main/avro/HoodieCleanMetadata.avsc
@@ -24,23 +24,22 @@
      {"name": "totalFilesDeleted", "type": "int"},
      {"name": "earliestCommitToRetain", "type": "string"},
      {"name": "partitionMetadata", "type": {
-     "type" : "map", "values" : {
-        "type": "record",
-        "name": "HoodieCleanPartitionMetadata",
-        "fields": [
-            {"name": "partitionPath", "type": "string"},
-            {"name": "policy", "type": "string"},
-            {"name": "deletePathPatterns", "type": {"type": "array", "items": "string"}},
-            {"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}},
-            {"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}}
-        ]
-     }
-     }
+        "type" : "map", "values" : "HoodieCleanPartitionMetadata"
+      }
      },
      {
         "name":"version",
         "type":["int", "null"],
         "default": 1
+     },
+     {
+        "name": "bootstrapPartitionMetadata",
+        "type": [ "null", {
+          "type" : "map", 
+          "values" : "HoodieCleanPartitionMetadata",
+          "default" : null
+        }],
+        "default" : null
      }
  ]
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java b/hudi-common/src/main/avro/HoodieCleanPartitionMetadata.avsc
similarity index 54%
copy from hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java
copy to hudi-common/src/main/avro/HoodieCleanPartitionMetadata.avsc
index cd30a69..877b725 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java
+++ b/hudi-common/src/main/avro/HoodieCleanPartitionMetadata.avsc
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,21 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.hudi.common.table.timeline.versioning.clean;
-
-import org.apache.hudi.avro.model.HoodieCleanMetadata;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.versioning.MetadataMigrator;
-
-import java.util.Arrays;
-
-public class CleanMetadataMigrator extends MetadataMigrator<HoodieCleanMetadata> {
-
-  public CleanMetadataMigrator(HoodieTableMetaClient metaClient) {
-    super(metaClient,
-        Arrays
-            .asList(new CleanV1MigrationHandler(metaClient),
-                new CleanV2MigrationHandler(metaClient)));
-  }
+{
+  "namespace": "org.apache.hudi.avro.model",
+  "type": "record",
+  "name": "HoodieCleanPartitionMetadata",
+  "fields": [
+     {"name": "partitionPath", "type": "string"},
+     {"name": "policy", "type": "string"},
+     {"name": "deletePathPatterns", "type": {"type": "array", "items": "string"}},
+     {"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}},
+     {"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}}
+  ] 
 }
diff --git a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc
index b87ed77..c4481c2 100644
--- a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc
+++ b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc
@@ -47,6 +47,7 @@
       "type": "string"
     },
     {
+      /** This is deprecated and replaced by the field filePathsToBeDeletedPerPartition **/
       "name": "filesToBeDeletedPerPartition",
       "type": [
         "null", {
@@ -64,6 +65,33 @@
       "name":"version",
       "type":["int", "null"],
       "default": 1
+    },
+    {
+      "name": "filePathsToBeDeletedPerPartition",
+      "doc": "This field replaces the field filesToBeDeletedPerPartition",
+      "type": [
+        "null", {
+        "type":"map",
+        "values": {
+          "type":"array",
+          "items":{
+            "name":"HoodieCleanFileInfo",
+            "type": "record",
+            "fields":[
+                     {
+                        "name":"filePath",
+                        "type":["null","string"],
+                        "default": null
+                     },
+                     {
+                        "name":"isBootstrapBaseFile",
+                        "type":["null","boolean"],
+                        "default": null
+                     }
+                ]
+          }
+      }}],
+      "default" : null
     }
   ]
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java
index 5fc3a15..e9de502 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common;
 
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 
 import java.io.Serializable;
@@ -39,17 +40,35 @@ public class HoodieCleanStat implements Serializable {
   private final List<String> successDeleteFiles;
   // Files that could not be deleted
   private final List<String> failedDeleteFiles;
+  // Bootstrap Base Path patterns that were generated for the delete operation
+  private final List<String> deleteBootstrapBasePathPatterns;
+  private final List<String> successDeleteBootstrapBaseFiles;
+  // Files that could not be deleted
+  private final List<String> failedDeleteBootstrapBaseFiles;
   // Earliest commit that was retained in this clean
   private final String earliestCommitToRetain;
 
   public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List<String> deletePathPatterns,
       List<String> successDeleteFiles, List<String> failedDeleteFiles, String earliestCommitToRetain) {
+    this(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles, earliestCommitToRetain,
+        CollectionUtils.createImmutableList(), CollectionUtils.createImmutableList(),
+        CollectionUtils.createImmutableList());
+  }
+
+  public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List<String> deletePathPatterns,
+                         List<String> successDeleteFiles, List<String> failedDeleteFiles,
+                         String earliestCommitToRetain, List<String> deleteBootstrapBasePathPatterns,
+                         List<String> successDeleteBootstrapBaseFiles,
+                         List<String> failedDeleteBootstrapBaseFiles) {
     this.policy = policy;
     this.partitionPath = partitionPath;
     this.deletePathPatterns = deletePathPatterns;
     this.successDeleteFiles = successDeleteFiles;
     this.failedDeleteFiles = failedDeleteFiles;
     this.earliestCommitToRetain = earliestCommitToRetain;
+    this.deleteBootstrapBasePathPatterns = deleteBootstrapBasePathPatterns;
+    this.successDeleteBootstrapBaseFiles = successDeleteBootstrapBaseFiles;
+    this.failedDeleteBootstrapBaseFiles = failedDeleteBootstrapBaseFiles;
   }
 
   public HoodieCleaningPolicy getPolicy() {
@@ -72,6 +91,18 @@ public class HoodieCleanStat implements Serializable {
     return failedDeleteFiles;
   }
 
+  public List<String> getDeleteBootstrapBasePathPatterns() {
+    return deleteBootstrapBasePathPatterns;
+  }
+
+  public List<String> getSuccessDeleteBootstrapBaseFiles() {
+    return successDeleteBootstrapBaseFiles;
+  }
+
+  public List<String> getFailedDeleteBootstrapBaseFiles() {
+    return failedDeleteBootstrapBaseFiles;
+  }
+
   public String getEarliestCommitToRetain() {
     return earliestCommitToRetain;
   }
@@ -91,6 +122,9 @@ public class HoodieCleanStat implements Serializable {
     private List<String> failedDeleteFiles;
     private String partitionPath;
     private String earliestCommitToRetain;
+    private List<String> deleteBootstrapBasePathPatterns;
+    private List<String> successDeleteBootstrapBaseFiles;
+    private List<String> failedDeleteBootstrapBaseFiles;
 
     public Builder withPolicy(HoodieCleaningPolicy policy) {
       this.policy = policy;
@@ -112,6 +146,21 @@ public class HoodieCleanStat implements Serializable {
       return this;
     }
 
+    public Builder withDeleteBootstrapBasePathPatterns(List<String> deletePathPatterns) {
+      this.deleteBootstrapBasePathPatterns = deletePathPatterns;
+      return this;
+    }
+
+    public Builder withSuccessfulDeleteBootstrapBaseFiles(List<String> successDeleteFiles) {
+      this.successDeleteBootstrapBaseFiles = successDeleteFiles;
+      return this;
+    }
+
+    public Builder withFailedDeleteBootstrapBaseFiles(List<String> failedDeleteFiles) {
+      this.failedDeleteBootstrapBaseFiles = failedDeleteFiles;
+      return this;
+    }
+
     public Builder withPartitionPath(String partitionPath) {
       this.partitionPath = partitionPath;
       return this;
@@ -125,7 +174,8 @@ public class HoodieCleanStat implements Serializable {
 
     public HoodieCleanStat build() {
       return new HoodieCleanStat(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles,
-          earliestCommitToRetain);
+          earliestCommitToRetain, deleteBootstrapBasePathPatterns, successDeleteBootstrapBaseFiles,
+        failedDeleteBootstrapBaseFiles);
     }
   }
 
@@ -137,7 +187,10 @@ public class HoodieCleanStat implements Serializable {
         + ", deletePathPatterns=" + deletePathPatterns
         + ", successDeleteFiles=" + successDeleteFiles
         + ", failedDeleteFiles=" + failedDeleteFiles
-        + ", earliestCommitToRetain='" + earliestCommitToRetain + '\''
+        + ", earliestCommitToRetain='" + earliestCommitToRetain
+        + ", deleteBootstrapBasePathPatterns=" + deleteBootstrapBasePathPatterns
+        + ", successDeleteBootstrapBaseFiles=" + successDeleteBootstrapBaseFiles
+        + ", failedDeleteBootstrapBaseFiles=" + failedDeleteBootstrapBaseFiles + '\''
         + '}';
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java b/hudi-common/src/main/java/org/apache/hudi/common/model/CleanFileInfo.java
similarity index 54%
copy from hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java
copy to hudi-common/src/main/java/org/apache/hudi/common/model/CleanFileInfo.java
index cd30a69..dd6db7a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/CleanFileInfo.java
@@ -16,20 +16,32 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.common.table.timeline.versioning.clean;
+package org.apache.hudi.common.model;
 
-import org.apache.hudi.avro.model.HoodieCleanMetadata;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.versioning.MetadataMigrator;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
 
-import java.util.Arrays;
+import java.io.Serializable;
 
-public class CleanMetadataMigrator extends MetadataMigrator<HoodieCleanMetadata> {
+public class CleanFileInfo implements Serializable {
 
-  public CleanMetadataMigrator(HoodieTableMetaClient metaClient) {
-    super(metaClient,
-        Arrays
-            .asList(new CleanV1MigrationHandler(metaClient),
-                new CleanV2MigrationHandler(metaClient)));
+  private final String filePath;
+  private final boolean isBootstrapBaseFile;
+
+  public CleanFileInfo(String filePath, boolean isBootstrapBaseFile) {
+    this.filePath = filePath;
+    this.isBootstrapBaseFile = isBootstrapBaseFile;
+  }
+
+  public String getFilePath() {
+    return filePath;
+  }
+
+  public boolean isBootstrapBaseFile() {
+    return isBootstrapBaseFile;
+  }
+
+  public HoodieCleanFileInfo toHoodieFileCleanInfo() {
+    return new HoodieCleanFileInfo(filePath, isBootstrapBaseFile);
   }
 }
+
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java
index cd30a69..adb5cc0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java
@@ -29,7 +29,7 @@ public class CleanMetadataMigrator extends MetadataMigrator<HoodieCleanMetadata>
   public CleanMetadataMigrator(HoodieTableMetaClient metaClient) {
     super(metaClient,
         Arrays
-            .asList(new CleanV1MigrationHandler(metaClient),
-                new CleanV2MigrationHandler(metaClient)));
+            .asList(new CleanMetadataV1MigrationHandler(metaClient),
+                new CleanMetadataV2MigrationHandler(metaClient)));
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanV1MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV1MigrationHandler.java
similarity index 95%
rename from hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanV1MigrationHandler.java
rename to hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV1MigrationHandler.java
index 49b70ac..0b69894 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanV1MigrationHandler.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV1MigrationHandler.java
@@ -31,11 +31,11 @@ import org.apache.hadoop.fs.Path;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public class CleanV1MigrationHandler extends AbstractMigratorBase<HoodieCleanMetadata> {
+public class CleanMetadataV1MigrationHandler extends AbstractMigratorBase<HoodieCleanMetadata> {
 
   public static final Integer VERSION = 1;
 
-  public CleanV1MigrationHandler(HoodieTableMetaClient metaClient) {
+  public CleanMetadataV1MigrationHandler(HoodieTableMetaClient metaClient) {
     super(metaClient);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanV2MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV2MigrationHandler.java
similarity index 95%
rename from hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanV2MigrationHandler.java
rename to hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV2MigrationHandler.java
index 2d8a869..d74dd88 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanV2MigrationHandler.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV2MigrationHandler.java
@@ -31,11 +31,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public class CleanV2MigrationHandler extends AbstractMigratorBase<HoodieCleanMetadata> {
+public class CleanMetadataV2MigrationHandler extends AbstractMigratorBase<HoodieCleanMetadata> {
 
   public static final Integer VERSION = 2;
 
-  public CleanV2MigrationHandler(HoodieTableMetaClient metaClient) {
+  public CleanMetadataV2MigrationHandler(HoodieTableMetaClient metaClient) {
     super(metaClient);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanMigrator.java
similarity index 74%
copy from hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java
copy to hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanMigrator.java
index cd30a69..73e5cbc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataMigrator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanMigrator.java
@@ -18,18 +18,19 @@
 
 package org.apache.hudi.common.table.timeline.versioning.clean;
 
-import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.versioning.MetadataMigrator;
 
 import java.util.Arrays;
 
-public class CleanMetadataMigrator extends MetadataMigrator<HoodieCleanMetadata> {
+/**
+ * Manages upgrade/downgrade of cleaner plan.
+ */
+public class CleanPlanMigrator extends MetadataMigrator<HoodieCleanerPlan> {
 
-  public CleanMetadataMigrator(HoodieTableMetaClient metaClient) {
+  public CleanPlanMigrator(HoodieTableMetaClient metaClient) {
     super(metaClient,
-        Arrays
-            .asList(new CleanV1MigrationHandler(metaClient),
-                new CleanV2MigrationHandler(metaClient)));
+        Arrays.asList(new CleanPlanV1MigrationHandler(metaClient), new CleanPlanV2MigrationHandler(metaClient)));
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java
new file mode 100644
index 0000000..0010aa2
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline.versioning.clean;
+
+import java.util.HashMap;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.AbstractMigratorBase;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class CleanPlanV1MigrationHandler extends AbstractMigratorBase<HoodieCleanerPlan> {
+
+  public static final Integer VERSION = 1;
+
+  public CleanPlanV1MigrationHandler(HoodieTableMetaClient metaClient) {
+    super(metaClient);
+  }
+
+  @Override
+  public Integer getManagedVersion() {
+    return VERSION;
+  }
+
+  @Override
+  public HoodieCleanerPlan upgradeFrom(HoodieCleanerPlan plan) {
+    throw new IllegalArgumentException(
+      "This is the lowest version. Plan cannot be any lower version");
+  }
+
+  @Override
+  public HoodieCleanerPlan downgradeFrom(HoodieCleanerPlan plan) {
+    if (metaClient.getTableConfig().getBootstrapBasePath().isPresent()) {
+      throw new IllegalArgumentException(
+        "This version do not support METADATA_ONLY bootstrapped tables. Failed to downgrade.");
+    }
+    Map<String, List<String>> filesPerPartition = plan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
+        .map(e -> {
+          return Pair.of(e.getKey(), e.getValue().stream().map(v -> new Path(v.getFilePath()).getName())
+            .collect(Collectors.toList()));
+        }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+    return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), plan.getPolicy(), filesPerPartition, VERSION,
+        new HashMap<>());
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java
new file mode 100644
index 0000000..e141e9a
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline.versioning.clean;
+
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.AbstractMigratorBase;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class CleanPlanV2MigrationHandler extends AbstractMigratorBase<HoodieCleanerPlan> {
+
+  public static final Integer VERSION = 2;
+
+  public CleanPlanV2MigrationHandler(HoodieTableMetaClient metaClient) {
+    super(metaClient);
+  }
+
+  @Override
+  public Integer getManagedVersion() {
+    return VERSION;
+  }
+
+  @Override
+  public HoodieCleanerPlan upgradeFrom(HoodieCleanerPlan plan) {
+    Map<String, List<HoodieCleanFileInfo>> filePathsPerPartition =
+        plan.getFilesToBeDeletedPerPartition().entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue().stream()
+          .map(v -> new HoodieCleanFileInfo(
+            new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), e.getKey()), v).toString(), false))
+          .collect(Collectors.toList()))).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+    return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), plan.getPolicy(), new HashMap<>(), VERSION,
+        filePathsPerPartition);
+  }
+
+  @Override
+  public HoodieCleanerPlan downgradeFrom(HoodieCleanerPlan input) {
+    throw new IllegalArgumentException(
+      "This is the current highest version. Plan cannot be any higher version");
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
index 2e31cea..ec29e93 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
@@ -252,7 +252,8 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
   }
 
   /**
-   * Add newly found clean instant.
+   * Add newly found clean instant. Note that cleaner metadata (.clean.completed)
+   * contains only relative paths unlike clean plans (.clean.requested) which contains absolute paths.
    *
    * @param timeline Timeline
    * @param instant Clean instant
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
index 96ac4ca..6049ee3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
@@ -18,16 +18,20 @@
 
 package org.apache.hudi.common.util;
 
+import java.util.stream.Collectors;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.model.CleanFileInfo;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator;
-import org.apache.hudi.common.table.timeline.versioning.clean.CleanV1MigrationHandler;
-import org.apache.hudi.common.table.timeline.versioning.clean.CleanV2MigrationHandler;
+import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV1MigrationHandler;
+import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV2MigrationHandler;
+import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -35,14 +39,16 @@ import java.util.List;
 import java.util.Map;
 
 public class CleanerUtils {
-  public static final Integer CLEAN_METADATA_VERSION_1 = CleanV1MigrationHandler.VERSION;
-  public static final Integer CLEAN_METADATA_VERSION_2 = CleanV2MigrationHandler.VERSION;
+  public static final Integer CLEAN_METADATA_VERSION_1 = CleanMetadataV1MigrationHandler.VERSION;
+  public static final Integer CLEAN_METADATA_VERSION_2 = CleanMetadataV2MigrationHandler.VERSION;
   public static final Integer LATEST_CLEAN_METADATA_VERSION = CLEAN_METADATA_VERSION_2;
 
   public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime,
                                                          Option<Long> durationInMs,
                                                          List<HoodieCleanStat> cleanStats) {
     Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = new HashMap<>();
+    Map<String, HoodieCleanPartitionMetadata> partitionBootstrapMetadataMap = new HashMap<>();
+
     int totalDeleted = 0;
     String earliestCommitToRetain = null;
     for (HoodieCleanStat stat : cleanStats) {
@@ -50,6 +56,13 @@ public class CleanerUtils {
           new HoodieCleanPartitionMetadata(stat.getPartitionPath(), stat.getPolicy().name(),
               stat.getDeletePathPatterns(), stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles());
       partitionMetadataMap.put(stat.getPartitionPath(), metadata);
+      if ((null != stat.getDeleteBootstrapBasePathPatterns())
+          && (!stat.getDeleteBootstrapBasePathPatterns().isEmpty())) {
+        HoodieCleanPartitionMetadata bootstrapMetadata = new HoodieCleanPartitionMetadata(stat.getPartitionPath(),
+            stat.getPolicy().name(), stat.getDeleteBootstrapBasePathPatterns(), stat.getSuccessDeleteBootstrapBaseFiles(),
+            stat.getFailedDeleteBootstrapBaseFiles());
+        partitionBootstrapMetadataMap.put(stat.getPartitionPath(), bootstrapMetadata);
+      }
       totalDeleted += stat.getSuccessDeleteFiles().size();
       if (earliestCommitToRetain == null) {
         // This will be the same for all partitions
@@ -57,8 +70,8 @@ public class CleanerUtils {
       }
     }
 
-    return new HoodieCleanMetadata(startCleanTime,
-        durationInMs.orElseGet(() -> -1L), totalDeleted, earliestCommitToRetain, partitionMetadataMap, CLEAN_METADATA_VERSION_2);
+    return new HoodieCleanMetadata(startCleanTime, durationInMs.orElseGet(() -> -1L), totalDeleted,
+      earliestCommitToRetain, partitionMetadataMap, CLEAN_METADATA_VERSION_2, partitionBootstrapMetadataMap);
   }
 
   /**
@@ -77,7 +90,7 @@ public class CleanerUtils {
   }
 
   /**
-   * Get Cleaner Plan corresponding to a clean instant.
+   * Get Latest version of cleaner plan corresponding to a clean instant.
    * @param metaClient  Hoodie Table Meta Client
    * @param cleanInstant Instant referring to clean action
    * @return Cleaner plan corresponding to clean instant
@@ -85,7 +98,18 @@ public class CleanerUtils {
    */
   public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient metaClient, HoodieInstant cleanInstant)
       throws IOException {
-    return TimelineMetadataUtils.deserializeAvroMetadata(metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get(),
-        HoodieCleanerPlan.class);
+    CleanPlanMigrator cleanPlanMigrator = new CleanPlanMigrator(metaClient);
+    HoodieCleanerPlan cleanerPlan = TimelineMetadataUtils.deserializeAvroMetadata(
+        metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get(), HoodieCleanerPlan.class);
+    return cleanPlanMigrator.upgradeToLatest(cleanerPlan, cleanerPlan.getVersion());
+  }
+
+  /**
+   * Convert list of cleanFileInfo instances to list of avro-generated HoodieCleanFileInfo instances.
+   * @param cleanFileInfoList
+   * @return
+   */
+  public static List<HoodieCleanFileInfo> convertToHoodieCleanFileInfoList(List<CleanFileInfo> cleanFileInfoList) {
+    return cleanFileInfoList.stream().map(CleanFileInfo::toHoodieFileCleanInfo).collect(Collectors.toList());
   }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java b/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java
index 428bd8c..ecfb59d 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.bootstrap.index.BootstrapIndex.IndexWriter;
 import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
 import org.apache.hudi.common.model.BootstrapFileMapping;
 import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.util.collection.Pair;
 
@@ -86,7 +87,7 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness {
 
   @Test
   public void testBootstrapIndexConcurrent() throws Exception {
-    Map<String, List<BootstrapFileMapping>> bootstrapMapping  = generateBootstrapIndex(100);
+    Map<String, List<BootstrapFileMapping>> bootstrapMapping  = generateBootstrapIndex(metaClient, BOOTSTRAP_BASE_PATH, PARTITIONS, 100);
     final int numThreads = 20;
     final int numRequestsPerThread = 50;
     ExecutorService service = Executors.newFixedThreadPool(numThreads);
@@ -111,15 +112,15 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness {
   }
 
   private void testBootstrapIndexOneRound(int numEntriesPerPartition) throws IOException {
-    Map<String, List<BootstrapFileMapping>> bootstrapMapping = generateBootstrapIndex(numEntriesPerPartition);
+    Map<String, List<BootstrapFileMapping>> bootstrapMapping = generateBootstrapIndex(metaClient, BOOTSTRAP_BASE_PATH, PARTITIONS, numEntriesPerPartition);
     validateBootstrapIndex(bootstrapMapping);
   }
 
-  private Map<String, List<BootstrapFileMapping>> generateBootstrapIndex(int numEntriesPerPartition)
-      throws IOException {
-    Map<String, List<BootstrapFileMapping>> bootstrapMapping = generateBootstrapMapping(numEntriesPerPartition);
+  public static Map<String, List<BootstrapFileMapping>> generateBootstrapIndex(HoodieTableMetaClient metaClient,
+      String sourceBasePath, String[] partitions, int numEntriesPerPartition) {
+    Map<String, List<BootstrapFileMapping>> bootstrapMapping = generateBootstrapMapping(sourceBasePath, partitions, numEntriesPerPartition);
     BootstrapIndex index = new HFileBootstrapIndex(metaClient);
-    try (IndexWriter writer = index.createWriter(BOOTSTRAP_BASE_PATH)) {
+    try (IndexWriter writer = index.createWriter(sourceBasePath)) {
       writer.begin();
       bootstrapMapping.entrySet().stream().forEach(e -> writer.appendNextPartition(e.getKey(), e.getValue()));
       writer.finish();
@@ -162,13 +163,14 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness {
     }
   }
 
-  private Map<String, List<BootstrapFileMapping>> generateBootstrapMapping(int numEntriesPerPartition) {
-    return Arrays.stream(PARTITIONS).map(partition -> {
+  private static Map<String, List<BootstrapFileMapping>> generateBootstrapMapping(String sourceBasePath,
+      String[] partitions, int numEntriesPerPartition) {
+    return Arrays.stream(partitions).map(partition -> {
       return Pair.of(partition, IntStream.range(0, numEntriesPerPartition).mapToObj(idx -> {
         String hudiFileId = UUID.randomUUID().toString();
         String sourceFileName = idx + ".parquet";
         HoodieFileStatus sourceFileStatus = HoodieFileStatus.newBuilder()
-            .setPath(HoodiePath.newBuilder().setUri(BOOTSTRAP_BASE_PATH + "/" + partition + "/" + sourceFileName).build())
+            .setPath(HoodiePath.newBuilder().setUri(sourceBasePath + "/" + partition + "/" + sourceFileName).build())
             .setLength(256 * 1024 * 1024L)
             .setAccessTime(new Date().getTime())
             .setModificationTime(new Date().getTime() + 99999)
@@ -179,7 +181,7 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness {
             .setPermission(HoodieFSPermission.newBuilder().setUserAction(FsAction.ALL.name())
                 .setGroupAction(FsAction.READ.name()).setOtherAction(FsAction.NONE.name()).setStickyBit(true).build())
             .build();
-        return new BootstrapFileMapping(BOOTSTRAP_BASE_PATH, partition, partition, sourceFileStatus, hudiFileId);
+        return new BootstrapFileMapping(sourceBasePath, partition, partition, sourceFileStatus, hudiFileId);
       }).collect(Collectors.toList()));
     }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
   }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 92d431c..c88ea51 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -45,6 +45,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
@@ -219,7 +220,8 @@ public class HoodieTestUtils {
               os = metaClient.getFs().create(commitFile, true);
               // Write empty clean metadata
               os.write(TimelineMetadataUtils.serializeCleanerPlan(
-                  new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(), 1)).get());
+                  new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(),
+                      CleanPlanV2MigrationHandler.VERSION, new HashMap<>())).get());
             } catch (IOException ioe) {
               throw new HoodieIOException(ioe.getMessage(), ioe);
             } finally {
diff --git a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
index b53c7d8..1c31ed5 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
+++ b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
@@ -551,7 +551,7 @@ public class TestBootstrap extends HoodieClientTestBase {
   }
 
   public static Dataset<Row> generateTestRawTripDataset(double timestamp, int from, int to, List<String> partitionPaths,
-                                                         JavaSparkContext jsc, SQLContext sqlContext) {
+                                                        JavaSparkContext jsc, SQLContext sqlContext) {
     boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty();
     final List<String> records = new ArrayList<>();
     IntStream.range(from, to).forEach(i -> {