You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/08/01 03:10:39 UTC

[hudi] branch master updated: [HUDI-1054] Several performance fixes during finalizing writes (#1768)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e79fbc0  [HUDI-1054] Several performance fixes during finalizing writes (#1768)
e79fbc0 is described below

commit e79fbc07fe803bc51cdf4f11948b133bbaa70595
Author: Udit Mehrotra <um...@illinois.edu>
AuthorDate: Fri Jul 31 20:10:28 2020 -0700

    [HUDI-1054] Several performance fixes during finalizing writes (#1768)
    
    Co-authored-by: Udit Mehrotra <ud...@amazon.com>
---
 .../cli/commands/TestArchivedCommitsCommand.java   |   2 +-
 .../hudi/cli/commands/TestCommitsCommand.java      |   2 +-
 .../org/apache/hudi/client/HoodieWriteClient.java  |   4 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  13 +++
 .../java/org/apache/hudi/table/HoodieTable.java    |  17 ++--
 .../hudi/table/HoodieTimelineArchiveLog.java       |  13 +--
 .../java/org/apache/hudi/table/MarkerFiles.java    | 100 ++++++++++++++++-----
 .../rollback/BaseRollbackActionExecutor.java       |   2 +-
 .../hudi/io/TestHoodieTimelineArchiveLog.java      |  14 +--
 .../org/apache/hudi/table/TestMarkerFiles.java     |  16 +++-
 10 files changed, 130 insertions(+), 53 deletions(-)

diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
index 313c1bc..4c7ce88 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
@@ -92,7 +92,7 @@ public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest {
 
     // archive
     HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
-    archiveLog.archiveIfRequired();
+    archiveLog.archiveIfRequired(jsc);
   }
 
   @AfterEach
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
index 45c340d..44e2b80 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
@@ -176,7 +176,7 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest {
     // archive
     metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
     HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, jsc.hadoopConfiguration());
-    archiveLog.archiveIfRequired();
+    archiveLog.archiveIfRequired(jsc);
 
     CommandResult cr = getShell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104"));
     assertTrue(cr.isSuccess());
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index b2ad315..9782b46 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -337,7 +337,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     try {
 
       // Delete the marker directory for the instant.
-      new MarkerFiles(table, instantTime).quietDeleteMarkerDir();
+      new MarkerFiles(table, instantTime).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism());
 
       // Do an inline compaction if enabled
       if (config.isInlineCompaction()) {
@@ -349,7 +349,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
       }
       // We cannot have unbounded commit files. Archive commits if we have to archive
       HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, hadoopConf);
-      archiveLog.archiveIfRequired();
+      archiveLog.archiveIfRequired(jsc);
       autoCleanOnCommit(instantTime);
     } catch (IOException ioe) {
       throw new HoodieIOException(ioe.getMessage(), ioe);
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 9aecdf7..69758f2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -88,6 +88,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
   public static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
   public static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
+  public static final String MARKERS_DELETE_PARALLELISM = "hoodie.markers.delete.parallelism";
+  public static final String DEFAULT_MARKERS_DELETE_PARALLELISM = "100";
   public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode";
   public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT
       .toString();
@@ -235,6 +237,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
   }
 
+  public int getMarkersDeleteParallelism() {
+    return Integer.parseInt(props.getProperty(MARKERS_DELETE_PARALLELISM));
+  }
+
   public boolean isEmbeddedTimelineServerEnabled() {
     return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
   }
@@ -830,6 +836,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withMarkersDeleteParallelism(int parallelism) {
+      props.setProperty(MARKERS_DELETE_PARALLELISM, String.valueOf(parallelism));
+      return this;
+    }
+
     public Builder withEmbeddedTimelineServerEnabled(boolean enabled) {
       props.setProperty(EMBEDDED_TIMELINE_SERVER_ENABLED, String.valueOf(enabled));
       return this;
@@ -874,6 +885,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
           DEFAULT_HOODIE_WRITE_STATUS_CLASS);
       setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), FINALIZE_WRITE_PARALLELISM,
           DEFAULT_FINALIZE_WRITE_PARALLELISM);
+      setDefaultOnCondition(props, !props.containsKey(MARKERS_DELETE_PARALLELISM), MARKERS_DELETE_PARALLELISM,
+          DEFAULT_MARKERS_DELETE_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED),
           EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED);
       setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 748091e..d8b0c6e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -70,6 +70,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -428,21 +429,21 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
       }
 
       // we are not including log appends here, since they are already fail-safe.
-      List<String> invalidDataPaths = markers.createdAndMergedDataPaths();
-      List<String> validDataPaths = stats.stream()
+      Set<String> invalidDataPaths = markers.createdAndMergedDataPaths(jsc, config.getFinalizeWriteParallelism());
+      Set<String> validDataPaths = stats.stream()
           .map(HoodieWriteStat::getPath)
           .filter(p -> p.endsWith(this.getBaseFileExtension()))
-          .collect(Collectors.toList());
+          .collect(Collectors.toSet());
+
       // Contains list of partially created files. These needs to be cleaned up.
       invalidDataPaths.removeAll(validDataPaths);
+
       if (!invalidDataPaths.isEmpty()) {
         LOG.info("Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths);
-      }
-      Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream()
-          .map(dp -> Pair.of(new Path(dp).getParent().toString(), new Path(basePath, dp).toString()))
-          .collect(Collectors.groupingBy(Pair::getKey));
+        Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream()
+            .map(dp -> Pair.of(new Path(dp).getParent().toString(), new Path(basePath, dp).toString()))
+            .collect(Collectors.groupingBy(Pair::getKey));
 
-      if (!invalidPathsByPartition.isEmpty()) {
         // Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS.
         // Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit
         if (consistencyCheckEnabled) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index 98d3e05..4be00a3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -54,6 +54,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -121,7 +122,7 @@ public class HoodieTimelineArchiveLog {
   /**
    * Check if commits need to be archived. If yes, archive commits.
    */
-  public boolean archiveIfRequired() throws IOException {
+  public boolean archiveIfRequired(JavaSparkContext jsc) throws IOException {
     try {
       List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
 
@@ -129,7 +130,7 @@ public class HoodieTimelineArchiveLog {
       if (!instantsToArchive.isEmpty()) {
         this.writer = openWriter();
         LOG.info("Archiving instants " + instantsToArchive);
-        archive(instantsToArchive);
+        archive(jsc, instantsToArchive);
         LOG.info("Deleting archived instants " + instantsToArchive);
         success = deleteArchivedInstants(instantsToArchive);
       } else {
@@ -267,7 +268,7 @@ public class HoodieTimelineArchiveLog {
     return success;
   }
 
-  public void archive(List<HoodieInstant> instants) throws HoodieCommitException {
+  public void archive(JavaSparkContext jsc, List<HoodieInstant> instants) throws HoodieCommitException {
     try {
       HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
       Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
@@ -275,7 +276,7 @@ public class HoodieTimelineArchiveLog {
       List<IndexedRecord> records = new ArrayList<>();
       for (HoodieInstant hoodieInstant : instants) {
         try {
-          deleteAnyLeftOverMarkerFiles(hoodieInstant);
+          deleteAnyLeftOverMarkerFiles(jsc, hoodieInstant);
           records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
           if (records.size() >= this.config.getCommitArchivalBatchSize()) {
             writeToFile(wrapperSchema, records);
@@ -293,9 +294,9 @@ public class HoodieTimelineArchiveLog {
     }
   }
 
-  private void deleteAnyLeftOverMarkerFiles(HoodieInstant instant) {
+  private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant instant) {
     MarkerFiles markerFiles = new MarkerFiles(table, instant.getTimestamp());
-    if (markerFiles.deleteMarkerDir()) {
+    if (markerFiles.deleteMarkerDir(jsc, config.getMarkersDeleteParallelism())) {
       LOG.info("Cleaned up left over marker directory for instant :" + instant);
     }
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
index 00eb7df..8a310fd 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
@@ -18,8 +18,12 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -28,26 +32,27 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.IOType;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
 import java.util.ArrayList;
-import java.util.LinkedList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Operates on marker files for a given write action (commit, delta commit, compaction).
  */
-public class MarkerFiles {
+public class MarkerFiles implements Serializable {
 
   private static final Logger LOG = LogManager.getLogger(MarkerFiles.class);
 
-  public static String stripMarkerSuffix(String path) {
-    return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN));
-  }
-
   private final String instantTime;
-  private final FileSystem fs;
-  private final Path markerDirPath;
+  private final transient FileSystem fs;
+  private final transient Path markerDirPath;
   private final String basePath;
 
   public MarkerFiles(FileSystem fs, String basePath, String markerFolderPath, String instantTime) {
@@ -64,9 +69,9 @@ public class MarkerFiles {
         instantTime);
   }
 
-  public void quietDeleteMarkerDir() {
+  public void quietDeleteMarkerDir(JavaSparkContext jsc, int parallelism) {
     try {
-      deleteMarkerDir();
+      deleteMarkerDir(jsc, parallelism);
     } catch (HoodieIOException ioe) {
       LOG.warn("Error deleting marker directory for instant " + instantTime, ioe);
     }
@@ -74,34 +79,77 @@ public class MarkerFiles {
 
   /**
    * Delete Marker directory corresponding to an instant.
+   *
+   * @param jsc Java Spark Context.
+   * @param parallelism Spark parallelism for deletion.
    */
-  public boolean deleteMarkerDir() {
+  public boolean deleteMarkerDir(JavaSparkContext jsc, int parallelism) {
     try {
-      boolean result = fs.delete(markerDirPath, true);
-      if (result) {
+      if (fs.exists(markerDirPath)) {
+        FileStatus[] fileStatuses = fs.listStatus(markerDirPath);
+        List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+                .map(fileStatus -> fileStatus.getPath().toString())
+                .collect(Collectors.toList());
+
+        if (markerDirSubPaths.size() > 0) {
+          SerializableConfiguration conf = new SerializableConfiguration(fs.getConf());
+          parallelism = Math.min(markerDirSubPaths.size(), parallelism);
+          jsc.parallelize(markerDirSubPaths, parallelism).foreach(subPathStr -> {
+            Path subPath = new Path(subPathStr);
+            FileSystem fileSystem = subPath.getFileSystem(conf.get());
+            fileSystem.delete(subPath, true);
+          });
+        }
+
+        boolean result = fs.delete(markerDirPath, true);
         LOG.info("Removing marker directory at " + markerDirPath);
-      } else {
-        LOG.info("No marker directory to delete at " + markerDirPath);
+        return result;
       }
-      return result;
     } catch (IOException ioe) {
       throw new HoodieIOException(ioe.getMessage(), ioe);
     }
+    return false;
   }
 
   public boolean doesMarkerDirExist() throws IOException {
     return fs.exists(markerDirPath);
   }
 
-  public List<String> createdAndMergedDataPaths() throws IOException {
-    List<String> dataFiles = new LinkedList<>();
-    FSUtils.processFiles(fs, markerDirPath.toString(), (status) -> {
-      String pathStr = status.getPath().toString();
-      if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {
-        dataFiles.add(translateMarkerToDataPath(pathStr));
+  public Set<String> createdAndMergedDataPaths(JavaSparkContext jsc, int parallelism) throws IOException {
+    Set<String> dataFiles = new HashSet<>();
+
+    FileStatus[] topLevelStatuses = fs.listStatus(markerDirPath);
+    List<String> subDirectories = new ArrayList<>();
+    for (FileStatus topLevelStatus: topLevelStatuses) {
+      if (topLevelStatus.isFile()) {
+        String pathStr = topLevelStatus.getPath().toString();
+        if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {
+          dataFiles.add(translateMarkerToDataPath(pathStr));
+        }
+      } else {
+        subDirectories.add(topLevelStatus.getPath().toString());
       }
-      return true;
-    }, false);
+    }
+
+    if (subDirectories.size() > 0) {
+      parallelism = Math.min(subDirectories.size(), parallelism);
+      SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
+      dataFiles.addAll(jsc.parallelize(subDirectories, parallelism).flatMap(directory -> {
+        Path path = new Path(directory);
+        FileSystem fileSystem = path.getFileSystem(serializedConf.get());
+        RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path, true);
+        List<String> result = new ArrayList<>();
+        while (itr.hasNext()) {
+          FileStatus status = itr.next();
+          String pathStr = status.getPath().toString();
+          if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {
+            result.add(translateMarkerToDataPath(pathStr));
+          }
+        }
+        return result.iterator();
+      }).collect());
+    }
+
     return dataFiles;
   }
 
@@ -110,6 +158,10 @@ public class MarkerFiles {
     return MarkerFiles.stripMarkerSuffix(rPath);
   }
 
+  public static String stripMarkerSuffix(String path) {
+    return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN));
+  }
+
   public List<String> allMarkerFilePaths() throws IOException {
     List<String> markerFiles = new ArrayList<>();
     FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 846e8a8..90b9bb3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -113,7 +113,7 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor<Hood
     }
 
     // Finally, remove the marker files post rollback.
-    new MarkerFiles(table, instantToRollback.getTimestamp()).quietDeleteMarkerDir();
+    new MarkerFiles(table, instantToRollback.getTimestamp()).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism());
 
     return rollbackMetadata;
   }
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
index 484caf7..5785fc8 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
@@ -79,7 +79,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
             .withParallelism(2, 2).forTable("test-trip-table").build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
-    boolean result = archiveLog.archiveIfRequired();
+    boolean result = archiveLog.archiveIfRequired(jsc);
     assertTrue(result);
   }
 
@@ -157,7 +157,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
 
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
-    assertTrue(archiveLog.archiveIfRequired());
+    assertTrue(archiveLog.archiveIfRequired(jsc));
 
     // reload the timeline and remove the remaining commits
     timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
@@ -246,7 +246,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
 
     HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
     assertEquals(4, timeline.countInstants(), "Loaded 4 commits and the count should match");
-    boolean result = archiveLog.archiveIfRequired();
+    boolean result = archiveLog.archiveIfRequired(jsc);
     assertTrue(result);
     timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
     assertEquals(4, timeline.countInstants(), "Should not archive commits when maxCommitsToKeep is 5");
@@ -289,7 +289,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
 
     HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
     assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
-    boolean result = archiveLog.archiveIfRequired();
+    boolean result = archiveLog.archiveIfRequired(jsc);
     assertTrue(result);
     timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
     assertTrue(timeline.containsOrBeforeTimelineStarts("100"), "Archived commits should always be safe");
@@ -315,7 +315,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
 
     HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
     assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
-    assertTrue(archiveLog.archiveIfRequired());
+    assertTrue(archiveLog.archiveIfRequired(jsc));
     timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
     assertEquals(5, timeline.countInstants(),
         "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)");
@@ -349,7 +349,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
 
     HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline();
     assertEquals(8, timeline.countInstants(), "Loaded 6 commits and the count should match");
-    boolean result = archiveLog.archiveIfRequired();
+    boolean result = archiveLog.archiveIfRequired(jsc);
     assertTrue(result);
     timeline = metaClient.getActiveTimeline().reload().getCommitsAndCompactionTimeline();
     assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100")),
@@ -397,7 +397,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
 
 
     HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, dfs.getConf());
-    boolean result = archiveLog.archiveIfRequired();
+    boolean result = archiveLog.archiveIfRequired(jsc);
     assertTrue(result);
     HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
     List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3);
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java
index 723d9e1..af679ce 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java
@@ -28,6 +28,9 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.IOType;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -45,16 +48,23 @@ public class TestMarkerFiles extends HoodieCommonTestHarness {
   private MarkerFiles markerFiles;
   private FileSystem fs;
   private Path markerFolderPath;
+  private JavaSparkContext jsc;
 
   @BeforeEach
   public void setup() throws IOException {
     initPath();
     initMetaClient();
+    this.jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(TestMarkerFiles.class.getName()));
     this.fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf());
     this.markerFolderPath =  new Path(metaClient.getMarkerFolderPath("000"));
     this.markerFiles = new MarkerFiles(fs, metaClient.getBasePath(), markerFolderPath.toString(), "000");
   }
 
+  @AfterEach
+  public void cleanup() {
+    jsc.stop();
+  }
+
   private void createSomeMarkerFiles() {
     markerFiles.create("2020/06/01", "file1", IOType.MERGE);
     markerFiles.create("2020/06/02", "file2", IOType.APPEND);
@@ -97,7 +107,7 @@ public class TestMarkerFiles extends HoodieCommonTestHarness {
 
     // then
     assertTrue(markerFiles.doesMarkerDirExist());
-    assertTrue(markerFiles.deleteMarkerDir());
+    assertTrue(markerFiles.deleteMarkerDir(jsc, 2));
     assertFalse(markerFiles.doesMarkerDirExist());
   }
 
@@ -105,7 +115,7 @@ public class TestMarkerFiles extends HoodieCommonTestHarness {
   public void testDeletionWhenMarkerDirNotExists() throws IOException {
     // then
     assertFalse(markerFiles.doesMarkerDirExist());
-    assertFalse(markerFiles.deleteMarkerDir());
+    assertFalse(markerFiles.deleteMarkerDir(jsc, 2));
   }
 
   @Test
@@ -120,7 +130,7 @@ public class TestMarkerFiles extends HoodieCommonTestHarness {
     // then
     assertIterableEquals(CollectionUtils.createImmutableList(
         "2020/06/01/file1", "2020/06/03/file3"),
-        markerFiles.createdAndMergedDataPaths().stream().sorted().collect(Collectors.toList())
+        markerFiles.createdAndMergedDataPaths(jsc, 2).stream().sorted().collect(Collectors.toList())
     );
   }