You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/01/11 21:23:31 UTC

[hudi] branch master updated: [HUDI-1502] MOR rollback and restore support for metadata sync (#2421)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e3d3677  [HUDI-1502] MOR rollback and restore support for metadata sync (#2421)
e3d3677 is described below

commit e3d3677b7e7899705b624925666317f0c074f7c7
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Mon Jan 11 16:23:13 2021 -0500

    [HUDI-1502] MOR rollback and restore support for metadata sync (#2421)
    
    - Adds field to RollbackMetadata that capture the logs written for rollback blocks
    - Adds field to RollbackMetadata that capture new logs files written by unsynced deltacommits
    
    Co-authored-by: Vinoth Chandar <vi...@apache.org>
---
 .../org/apache/hudi/io/HoodieAppendHandle.java     |   1 +
 .../AbstractMarkerBasedRollbackStrategy.java       |  26 +++--
 .../BaseMergeOnReadRollbackActionExecutor.java     |   3 +-
 .../hudi/table/action/rollback/RollbackUtils.java  |   6 +-
 .../rollback/ListingBasedRollbackHelper.java       |  21 +++-
 .../rollback/SparkMarkerBasedRollbackStrategy.java |  14 +++
 .../hudi/metadata/TestHoodieBackedMetadata.java    |  43 ++++---
 .../rollback/TestMarkerBasedRollbackStrategy.java  | 130 +++++++++++++++------
 .../src/main/avro/HoodieRollbackMetadata.avsc      |  16 ++-
 .../org/apache/hudi/common/HoodieRollbackStat.java |  20 +++-
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  13 ++-
 .../table/timeline/TimelineMetadataUtils.java      |  28 ++---
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  34 ++++--
 .../hudi/common/table/TestTimelineUtils.java       |  27 ++---
 .../table/view/TestIncrementalFSViewSync.java      |   2 +-
 15 files changed, 268 insertions(+), 116 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 3faac2e..c6ea7ba 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -164,6 +164,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
         // Since the actual log file written to can be different based on when rollover happens, we use the
         // base file to denote some log appends happened on a slice. writeToken will still fence concurrent
         // writers.
+        // https://issues.apache.org/jira/browse/HUDI-1517
         createMarkerFile(partitionPath, FSUtils.makeDataFileName(baseInstantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()));
 
         this.writer = createLogWriter(fileSlice, baseInstantTime);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
index cb6fff9..cc596ba 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
@@ -53,9 +53,9 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
 
   protected final HoodieWriteConfig config;
 
-  private final String basePath;
+  protected final String basePath;
 
-  private final String instantTime;
+  protected final String instantTime;
 
   public AbstractMarkerBasedRollbackStrategy(HoodieTable<T, I, K, O> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
     this.table = table;
@@ -90,6 +90,7 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
     String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
     String baseCommitTime = FSUtils.getCommitTime(baseFilePathForAppend.getName());
     String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), new Path(basePath, appendBaseFilePath).getParent());
+    final Map<FileStatus, Long> writtenLogFileSizeMap = getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
 
     HoodieLogFormat.Writer writer = null;
     try {
@@ -121,17 +122,26 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
       }
     }
 
-    Map<FileStatus, Long> filesToNumBlocksRollback = Collections.emptyMap();
-    if (config.useFileListingMetadata()) {
-      // When metadata is enabled, the information of files appended to is required
-      filesToNumBlocksRollback = Collections.singletonMap(
+    // the information of files appended to is required for metadata sync
+    Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
           table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
           1L);
-    }
 
     return HoodieRollbackStat.newBuilder()
         .withPartitionPath(partitionPath)
         .withRollbackBlockAppendResults(filesToNumBlocksRollback)
-        .build();
+        .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build();
+  }
+
+  /**
+   * Returns written log file size map for the respective baseCommitTime to assist in metadata table syncing.
+   * @param partitionPath partition path of interest
+   * @param baseCommitTime base commit time of interest
+   * @param fileId fileId of interest
+   * @return Map<FileStatus, File size>
+   * @throws IOException
+   */
+  protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPath, String baseCommitTime, String fileId) throws IOException {
+    return Collections.EMPTY_MAP;
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java
index 44cd5c8..2e75144 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java
@@ -28,7 +28,6 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -57,7 +56,7 @@ public abstract class BaseMergeOnReadRollbackActionExecutor<T extends HoodieReco
   }
 
   @Override
-  protected List<HoodieRollbackStat> executeRollback() throws IOException {
+  protected List<HoodieRollbackStat> executeRollback() {
     HoodieTimer rollbackTimer = new HoodieTimer();
     rollbackTimer.startTimer();
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
index 1ca0196..8ddb7e9 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
@@ -74,14 +74,16 @@ public class RollbackUtils {
     final List<String> successDeleteFiles = new ArrayList<>();
     final List<String> failedDeleteFiles = new ArrayList<>();
     final Map<FileStatus, Long> commandBlocksCount = new HashMap<>();
-    final List<FileStatus> filesToRollback = new ArrayList<>();
+    final Map<FileStatus, Long> writtenLogFileSizeMap = new HashMap<>();
     Option.ofNullable(stat1.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll);
     Option.ofNullable(stat2.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll);
     Option.ofNullable(stat1.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll);
     Option.ofNullable(stat2.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll);
     Option.ofNullable(stat1.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll);
     Option.ofNullable(stat2.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll);
-    return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount);
+    Option.ofNullable(stat1.getWrittenLogFileSizeMap()).ifPresent(writtenLogFileSizeMap::putAll);
+    Option.ofNullable(stat2.getWrittenLogFileSizeMap()).ifPresent(writtenLogFileSizeMap::putAll);
+    return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount, writtenLogFileSizeMap);
   }
 
   /**
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
index a614aeb..fcb3882 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -49,6 +50,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import scala.Tuple2;
 
@@ -116,12 +118,22 @@ public class ListingBasedRollbackHelper implements Serializable {
                   .withDeletedFileResults(filesToDeletedStatus).build());
         }
         case APPEND_ROLLBACK_BLOCK: {
+          String fileId = rollbackRequest.getFileId().get();
+          String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
+
+          // collect all log files that is supposed to be deleted with this rollback
+          Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
+              FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
+              fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
+              .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
+
           Writer writer = null;
           try {
             writer = HoodieLogFormat.newWriterBuilder()
                 .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
-                .withFileId(rollbackRequest.getFileId().get())
-                .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs())
+                .withFileId(fileId)
+                .overBaseCommit(latestBaseInstant)
+                .withFs(metaClient.getFs())
                 .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
 
             // generate metadata
@@ -149,9 +161,11 @@ public class ListingBasedRollbackHelper implements Serializable {
               metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
               1L
           );
+
           return new Tuple2<>(rollbackRequest.getPartitionPath(),
               HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                  .withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
+                  .withRollbackBlockAppendResults(filesToNumBlocksRollback)
+                  .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
         }
         default:
           throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
@@ -159,7 +173,6 @@ public class ListingBasedRollbackHelper implements Serializable {
     });
   }
 
-
   /**
    * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
    */
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
index 0598e54..0f36cb8 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java
@@ -22,7 +22,10 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.IOType;
@@ -32,10 +35,14 @@ import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.MarkerFiles;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 import scala.Tuple2;
 
@@ -75,4 +82,11 @@ public class SparkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> ext
       throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
     }
   }
+
+  protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
+    // collect all log files that is supposed to be deleted with this rollback
+    return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
+        FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime)
+        .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
index 7e3dea4..3d770c7 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
@@ -80,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
+
   private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class);
 
   @TempDir
@@ -261,13 +262,11 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
   /**
    * Test rollback of various table operations sync to Metadata Table correctly.
    */
-  //@ParameterizedTest
-  //@EnumSource(HoodieTableType.class)
-  //public void testRollbackOperations(HoodieTableType tableType) throws Exception {
-  @Test
-  public void testRollbackOperations() throws Exception {
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testRollbackOperations(HoodieTableType tableType) throws Exception {
     //FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
-    init(HoodieTableType.COPY_ON_WRITE);
+    init(tableType);
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
 
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
@@ -371,13 +370,13 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
   }
 
   /**
-   * Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op
-   * occurs to metadata.
-   * @throws Exception
+   * Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op occurs to metadata.
+   * Once explicit sync is called, metadata should match.
    */
-  @Test
-  public void testRollbackUnsyncedCommit() throws Exception {
-    init(HoodieTableType.COPY_ON_WRITE);
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testRollbackUnsyncedCommit(HoodieTableType tableType) throws Exception {
+    init(tableType);
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
 
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
@@ -389,7 +388,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
       assertNoWriteErrors(writeStatuses);
       validateMetadata(client);
     }
-
     String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
       // Commit with metadata disabled
@@ -401,6 +399,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
     }
 
     try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true))) {
+      assertFalse(metadata(client).isInSync());
+      client.syncTableMetadata();
       validateMetadata(client);
     }
   }
@@ -528,8 +528,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
   }
 
   /**
-   * Instants on Metadata Table should be archived as per config.
-   * Metadata Table should be automatically compacted as per config.
+   * Instants on Metadata Table should be archived as per config. Metadata Table should be automatically compacted as per config.
    */
   @Test
   public void testCleaningArchivingAndCompaction() throws Exception {
@@ -752,8 +751,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
 
   /**
    * Validate the metadata tables contents to ensure it matches what is on the file system.
-   *
-   * @throws IOException
    */
   private void validateMetadata(SparkRDDWriteClient client) throws IOException {
     HoodieWriteConfig config = client.getConfig();
@@ -807,7 +804,19 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
         if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) {
           LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray()));
           LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray()));
+
+          for (String fileName : fsFileNames) {
+            if (!metadataFilenames.contains(fileName)) {
+              LOG.error(partition + "FsFilename " + fileName + " not found in Meta data");
+            }
+          }
+          for (String fileName : metadataFilenames) {
+            if (!fsFileNames.contains(fileName)) {
+              LOG.error(partition + "Metadata file " + fileName + " not found in original FS");
+            }
+          }
         }
+
         assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within partition " + partition + " should match");
         assertTrue(fsFileNames.equals(metadataFilenames), "Files within partition " + partition + " should match");
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
index 7acff79..83e2b05 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
@@ -18,20 +18,33 @@
 
 package org.apache.hudi.table.action.rollback;
 
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Stream;
 
@@ -40,13 +53,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
 
+  private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with listing metadata enable={0}";
+
+  public static Stream<Arguments> configParams() {
+    return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of);
+  }
+
+  private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
+
   @BeforeEach
   public void setUp() throws Exception {
     initPath();
     initSparkContexts();
     initFileSystem();
-    initMetaClient();
-    initDFS();
+    initMetaClient(tableType);
+    initTestDataGenerator();
   }
 
   @AfterEach
@@ -55,7 +76,7 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
   }
 
   @Test
-  public void testCopyOnWriteRollback() throws Exception {
+  public void testCopyOnWriteRollbackWithTestTable() throws Exception {
     // given: wrote some base files and corresponding markers
     HoodieTestTable testTable = HoodieTestTable.of(metaClient);
     String f0 = testTable.addRequestedCommit("000")
@@ -85,43 +106,78 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
     assertEquals(1, stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum());
   }
 
-  @Test
-  public void testMergeOnReadRollback() throws Exception {
-    // given: wrote some base + log files and corresponding markers
-    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
-    String f2 = testTable.addRequestedDeltaCommit("000")
-        .getFileIdsWithBaseFilesInPartitions("partA").get("partA");
-    String f1 = testTable.addDeltaCommit("001")
-        .withLogFile("partA", f2)
-        .getFileIdsWithBaseFilesInPartitions("partB").get("partB");
-    String f3 = "f3";
-    String f4 = "f4";
-    testTable.forDeltaCommit("001")
-        .withMarkerFile("partB", f1, IOType.CREATE)
-        .withMarkerFile("partA", f3, IOType.CREATE)
-        .withMarkerFile("partA", f2, IOType.APPEND)
-        .withMarkerFile("partB", f4, IOType.APPEND);
+  @Tag("functional")
+  @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
+  @MethodSource("configParams")
+  public void testCopyOnWriteRollback(boolean useFileListingMetadata) throws Exception {
+    HoodieWriteConfig writeConfig = getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build())
+        .withPath(basePath).build();
+
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, writeConfig)) {
+      // rollback 2nd commit and ensure stats reflect the info.
+      List<HoodieRollbackStat> stats = testRun(useFileListingMetadata, writeConfig, writeClient);
+
+      assertEquals(3, stats.size());
+      for (HoodieRollbackStat stat : stats) {
+        assertEquals(1, stat.getSuccessDeleteFiles().size());
+        assertEquals(0, stat.getFailedDeleteFiles().size());
+        assertEquals(0, stat.getCommandBlocksCount().size());
+        assertEquals(0, stat.getWrittenLogFileSizeMap().size());
+      }
+    }
+  }
 
-    // when
-    List<HoodieRollbackStat> stats = new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002")
-        .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "001"));
+  @Tag("functional")
+  @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
+  @MethodSource("configParams")
+  public void testMergeOnReadRollback(boolean useFileListingMetadata) throws Exception {
+    // init MERGE_ON_READ_TABLE
+    tearDown();
+    tableType = HoodieTableType.MERGE_ON_READ;
+    setUp();
+
+    HoodieWriteConfig writeConfig = getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build())
+        .withPath(basePath).build();
+
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, writeConfig)) {
+
+      // rollback 2nd commit and ensure stats reflect the info.
+      List<HoodieRollbackStat> stats = testRun(useFileListingMetadata, writeConfig, writeClient);
+
+      assertEquals(3, stats.size());
+      for (HoodieRollbackStat stat : stats) {
+        assertEquals(0, stat.getSuccessDeleteFiles().size());
+        assertEquals(0, stat.getFailedDeleteFiles().size());
+        assertEquals(1, stat.getCommandBlocksCount().size());
+        stat.getCommandBlocksCount().forEach((fileStatus, len) -> assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())));
+        assertEquals(1, stat.getWrittenLogFileSizeMap().size());
+        stat.getWrittenLogFileSizeMap().forEach((fileStatus, len) -> assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())));
+      }
+    }
+  }
 
-    // then: ensure files are deleted, rollback block is appended (even if append does not exist)
-    assertEquals(2, stats.size());
-    // will have the log file
-    FileStatus[] partBFiles = testTable.listAllFilesInPartition("partB");
-    assertEquals(1, partBFiles.length);
-    assertTrue(partBFiles[0].getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()));
-    assertTrue(partBFiles[0].getLen() > 0);
+  private List<HoodieRollbackStat> testRun(boolean useFileListingMetadata, HoodieWriteConfig writeConfig, SparkRDDWriteClient writeClient) {
+    String newCommitTime = "001";
+    writeClient.startCommitWithTime(newCommitTime);
 
-    FileStatus[] partAFiles = testTable.listAllFilesInPartition("partA");
-    assertEquals(3, partAFiles.length);
-    assertEquals(2, Stream.of(partAFiles).filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count());
-    assertEquals(1, Stream.of(partAFiles).filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count());
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
+    JavaRDD<WriteStatus> writeStatuses = writeClient.insert(jsc.parallelize(records, 1), newCommitTime);
+    writeClient.commit(newCommitTime, writeStatuses);
 
-    // only partB/f1_001 will be deleted
-    assertEquals(1, stats.stream().mapToInt(r -> r.getSuccessDeleteFiles().size()).sum());
-    // partA/f3_001 is non existent
-    assertEquals(1, stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum());
+    // Updates
+    newCommitTime = "002";
+    writeClient.startCommitWithTime(newCommitTime);
+    records = dataGen.generateUniqueUpdates(newCommitTime, 50);
+    writeStatuses = writeClient.upsert(jsc.parallelize(records, 1), newCommitTime);
+    writeStatuses.collect();
+
+    // rollback 2nd commit and ensure stats reflect the info.
+    return new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(writeConfig, context, metaClient), context, writeConfig, "003")
+        .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"));
   }
+
 }
diff --git a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
index 069881e..a972bfd 100644
--- a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
+++ b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
@@ -31,18 +31,24 @@
             {"name": "partitionPath", "type": "string"},
             {"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}},
             {"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}},
-            {"name": "appendFiles", "type": {
+            {"name": "rollbackLogFiles", "type": {
                 "type": "map",
-                "doc": "Files to which append blocks were written",
+                "doc": "Files to which append blocks were written to capture rollback commit",
                 "values": {
                     "type": "long",
                     "doc": "Size of this file in bytes"
                 }
+            }},
+            {"name": "writtenLogFiles", "type": {
+                "type": "map",
+                "doc": "Log files written that were expected to be rolledback",
+                 "values": {
+                    "type": "long",
+                    "doc": "Size of this file in bytes"
+                }
             }}
         ]
-     }
-     }
-     },
+     }}},
      {
         "name":"version",
         "type":["int", "null"],
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java
index a3191fa..3e4ee34 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java
@@ -38,13 +38,16 @@ public class HoodieRollbackStat implements Serializable {
   private final List<String> failedDeleteFiles;
   // Count of HoodieLogFile to commandBlocks written for a particular rollback
   private final Map<FileStatus, Long> commandBlocksCount;
+  // all log files with same base instant as instant to be rolledback
+  private final Map<FileStatus, Long> writtenLogFileSizeMap;
 
   public HoodieRollbackStat(String partitionPath, List<String> successDeleteFiles, List<String> failedDeleteFiles,
-      Map<FileStatus, Long> commandBlocksCount) {
+      Map<FileStatus, Long> commandBlocksCount, Map<FileStatus, Long> writtenLogFileSizeMap) {
     this.partitionPath = partitionPath;
     this.successDeleteFiles = successDeleteFiles;
     this.failedDeleteFiles = failedDeleteFiles;
     this.commandBlocksCount = commandBlocksCount;
+    this.writtenLogFileSizeMap = writtenLogFileSizeMap;
   }
 
   public Map<FileStatus, Long> getCommandBlocksCount() {
@@ -63,6 +66,10 @@ public class HoodieRollbackStat implements Serializable {
     return failedDeleteFiles;
   }
 
+  public Map<FileStatus, Long> getWrittenLogFileSizeMap() {
+    return writtenLogFileSizeMap;
+  }
+
   public static HoodieRollbackStat.Builder newBuilder() {
     return new Builder();
   }
@@ -75,6 +82,7 @@ public class HoodieRollbackStat implements Serializable {
     private List<String> successDeleteFiles;
     private List<String> failedDeleteFiles;
     private Map<FileStatus, Long> commandBlocksCount;
+    private Map<FileStatus, Long> writtenLogFileSizeMap;
     private String partitionPath;
 
     public Builder withDeletedFileResults(Map<FileStatus, Boolean> deletedFiles) {
@@ -100,6 +108,11 @@ public class HoodieRollbackStat implements Serializable {
       return this;
     }
 
+    public Builder withWrittenLogFileSizeMap(Map<FileStatus, Long> writtenLogFileSizeMap) {
+      this.writtenLogFileSizeMap = writtenLogFileSizeMap;
+      return this;
+    }
+
     public Builder withPartitionPath(String partitionPath) {
       this.partitionPath = partitionPath;
       return this;
@@ -115,7 +128,10 @@ public class HoodieRollbackStat implements Serializable {
       if (commandBlocksCount == null) {
         commandBlocksCount = Collections.EMPTY_MAP;
       }
-      return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles, commandBlocksCount);
+      if (writtenLogFileSizeMap == null) {
+        writtenLogFileSizeMap = Collections.EMPTY_MAP;
+      }
+      return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles, commandBlocksCount, writtenLogFileSizeMap);
     }
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 7978eed..1990c0a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -45,6 +45,7 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -426,10 +427,14 @@ public class FSUtils {
    */
   public static Stream<HoodieLogFile> getAllLogFiles(FileSystem fs, Path partitionPath, final String fileId,
       final String logFileExtension, final String baseCommitTime) throws IOException {
-    return Arrays
-        .stream(fs.listStatus(partitionPath,
-            path -> path.getName().startsWith("." + fileId) && path.getName().contains(logFileExtension)))
-        .map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime));
+    try {
+      return Arrays
+          .stream(fs.listStatus(partitionPath,
+              path -> path.getName().startsWith("." + fileId) && path.getName().contains(logFileExtension)))
+          .map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime));
+    } catch (FileNotFoundException e) {
+      return Stream.<HoodieLogFile>builder().build();
+    }
   }
 
   /**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index 962d69d..9b419ca 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -18,17 +18,6 @@
 
 package org.apache.hudi.common.table.timeline;
 
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableByteArrayInput;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.hadoop.fs.FileStatus;
-
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -44,6 +33,17 @@ import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableByteArrayInput;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.hadoop.fs.FileStatus;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Collections;
@@ -71,10 +71,12 @@ public class TimelineMetadataUtils {
     Map<String, HoodieRollbackPartitionMetadata> partitionMetadataBuilder = new HashMap<>();
     int totalDeleted = 0;
     for (HoodieRollbackStat stat : rollbackStats) {
-      Map<String, Long> appendFiles = stat.getCommandBlocksCount().keySet().stream()
+      Map<String, Long> rollbackLogFiles = stat.getCommandBlocksCount().keySet().stream()
+          .collect(Collectors.toMap(f -> f.getPath().toString(), FileStatus::getLen));
+      Map<String, Long> probableLogFiles = stat.getWrittenLogFileSizeMap().keySet().stream()
           .collect(Collectors.toMap(f -> f.getPath().toString(), FileStatus::getLen));
       HoodieRollbackPartitionMetadata metadata = new HoodieRollbackPartitionMetadata(stat.getPartitionPath(),
-          stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), appendFiles);
+          stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), rollbackLogFiles, probableLogFiles);
       partitionMetadataBuilder.put(stat.getPartitionPath(), metadata);
       totalDeleted += stat.getSuccessDeleteFiles().size();
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index ed2a878..5942312 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.metadata;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -34,6 +33,8 @@ import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -44,6 +45,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
@@ -246,12 +248,13 @@ public class HoodieTableMetadataUtil {
 
     rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
       // Has this rollback produced new files?
-      boolean hasAppendFiles = pm.getAppendFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
+      boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null && !pm.getRollbackLogFiles().isEmpty();
+      boolean hasNonZeroRollbackLogFiles = hasRollbackLogFiles && pm.getRollbackLogFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
       // If commit being rolled back has not been synced to metadata table yet then there is no need to update metadata
       boolean shouldSkip = lastSyncTs.isPresent()
           && HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0), HoodieTimeline.GREATER_THAN, lastSyncTs.get());
 
-      if (!hasAppendFiles && shouldSkip) {
+      if (!hasNonZeroRollbackLogFiles && shouldSkip) {
         LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, given metadata table is already synced upto to %s",
             rollbackMetadata.getCommitsRollback().get(0), lastSyncTs.get()));
         return;
@@ -269,16 +272,31 @@ public class HoodieTableMetadataUtil {
         partitionToDeletedFiles.get(partition).addAll(deletedFiles);
       }
 
-      if (!pm.getAppendFiles().isEmpty()) {
+      BiFunction<Long, Long, Long> fileMergeFn = (oldSize, newSizeCopy) -> {
+        // if a file exists in both written log files and rollback log files, we want to pick the one that is higher
+        // as rollback file could have been updated after written log files are computed.
+        return oldSize > newSizeCopy ? oldSize : newSizeCopy;
+      };
+
+      if (hasRollbackLogFiles) {
         if (!partitionToAppendedFiles.containsKey(partition)) {
           partitionToAppendedFiles.put(partition, new HashMap<>());
         }
 
         // Extract appended file name from the absolute paths saved in getAppendFiles()
-        pm.getAppendFiles().forEach((path, size) -> {
-          partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> {
-            return size + oldSize;
-          });
+        pm.getRollbackLogFiles().forEach((path, size) -> {
+          partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, fileMergeFn);
+        });
+      }
+
+      if (pm.getWrittenLogFiles() != null && !pm.getWrittenLogFiles().isEmpty()) {
+        if (!partitionToAppendedFiles.containsKey(partition)) {
+          partitionToAppendedFiles.put(partition, new HashMap<>());
+        }
+
+        // Extract appended file name from the absolute paths saved in getWrittenLogFiles()
+        pm.getWrittenLogFiles().forEach((path, size) -> {
+          partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, fileMergeFn);
         });
       }
     });
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index b79ffbb..18c0d3f 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -72,8 +72,8 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
     HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, ts1);
     activeTimeline.createNewInstant(instant1);
     // create replace metadata only with replaced file Ids (no new files created)
-    activeTimeline.saveAsComplete(instant1, 
-        Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition,2, newFilePartition,0, Collections.emptyMap())));
+    activeTimeline.saveAsComplete(instant1,
+        Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition, 2, newFilePartition, 0, Collections.emptyMap())));
     metaClient.reloadActiveTimeline();
 
     List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("0", 10));
@@ -85,7 +85,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
     activeTimeline.createNewInstant(instant2);
     // create replace metadata only with replaced file Ids (no new files created)
     activeTimeline.saveAsComplete(instant2,
-        Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition,0, newFilePartition,3, Collections.emptyMap())));
+        Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition, 0, newFilePartition, 3, Collections.emptyMap())));
     metaClient.reloadActiveTimeline();
     partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
     assertEquals(1, partitions.size());
@@ -96,7 +96,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
     assertTrue(partitions.contains(replacePartition));
     assertTrue(partitions.contains(newFilePartition));
   }
-  
+
   @Test
   public void testGetPartitions() throws IOException {
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
@@ -120,20 +120,20 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
     // verify modified partitions included cleaned data
     List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
     assertEquals(5, partitions.size());
-    assertEquals(partitions, Arrays.asList(new String[]{"0", "2", "3", "4", "5"}));
+    assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4", "5"}));
 
     partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
     assertEquals(4, partitions.size());
-    assertEquals(partitions, Arrays.asList(new String[]{"0", "2", "3", "4"}));
+    assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4"}));
 
     // verify only commit actions
     partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
     assertEquals(4, partitions.size());
-    assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"}));
+    assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"}));
 
     partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
     assertEquals(3, partitions.size());
-    assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"}));
+    assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"}));
   }
 
   @Test
@@ -181,10 +181,10 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
 
     // verify modified partitions included cleaned data
     List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
-    assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"}));
+    assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"}));
 
     partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
-    assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"}));
+    assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"}));
   }
 
   @Test
@@ -201,7 +201,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
     activeTimeline.createNewInstant(instant);
     activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));
 
-    ts =  "1";
+    ts = "1";
     instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
     activeTimeline.createNewInstant(instant);
     Map<String, String> extraMetadata = new HashMap<>();
@@ -241,7 +241,8 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
     List<HoodieInstant> rollbacks = new ArrayList<>();
     rollbacks.add(new HoodieInstant(false, actionType, commitTs));
 
-    HoodieRollbackStat rollbackStat = new HoodieRollbackStat(partition, deletedFiles, Collections.emptyList(), Collections.emptyMap());
+    HoodieRollbackStat rollbackStat = new HoodieRollbackStat(partition, deletedFiles, Collections.emptyList(), Collections.emptyMap(),
+        Collections.EMPTY_MAP);
     List<HoodieRollbackStat> rollbackStats = new ArrayList<>();
     rollbackStats.add(rollbackStat);
     return TimelineMetadataUtils.convertRollbackMetadata(commitTs, Option.empty(), rollbacks, rollbackStats);
@@ -264,7 +265,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
   }
 
   private byte[] getReplaceCommitMetadata(String basePath, String commitTs, String replacePartition, int replaceCount,
-                                          String newFilePartition, int newFileCount, Map<String, String> extraMetadata)
+      String newFilePartition, int newFileCount, Map<String, String> extraMetadata)
       throws IOException {
     HoodieReplaceCommitMetadata commit = new HoodieReplaceCommitMetadata();
     for (int i = 1; i <= newFileCount; i++) {
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
index 5400dc4..146e0bb 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
@@ -556,7 +556,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
       boolean isRestore) throws IOException {
     Map<String, List<String>> partititonToFiles = deleteFiles(files);
     List<HoodieRollbackStat> rollbackStats = partititonToFiles.entrySet().stream().map(e ->
-        new HoodieRollbackStat(e.getKey(), e.getValue(), new ArrayList<>(), new HashMap<>())
+        new HoodieRollbackStat(e.getKey(), e.getValue(), new ArrayList<>(), new HashMap<>(), new HashMap<>())
     ).collect(Collectors.toList());
 
     List<HoodieInstant> rollbacks = new ArrayList<>();