You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/01/13 12:03:38 UTC

[hudi] 01/03: [HUDI-3007] Fix issues in HoodieRepairTool (#4564)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 59cfa4b7e4404456c26c9c8df17182ed6e68ccd1
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Wed Jan 12 09:03:27 2022 -0800

    [HUDI-3007] Fix issues in HoodieRepairTool (#4564)
---
 .../org/apache/hudi/table/repair/RepairUtils.java  |   6 +-
 .../org/apache/hudi/HoodieTestCommitGenerator.java | 183 +++++++++
 .../apache/hudi/table/repair/TestRepairUtils.java  | 176 +++++++++
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  47 +++
 .../org/apache/hudi/common/fs/TestFSUtils.java     |  20 +-
 .../apache/hudi/utilities/HoodieRepairTool.java    | 235 ++++++------
 .../hudi/utilities/TestHoodieRepairTool.java       | 409 +++++++++++++++++++++
 7 files changed, 957 insertions(+), 119 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java
index 156da66..5aa03a4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java
@@ -56,17 +56,15 @@ public final class RepairUtils {
    * Tags the instant time of each base or log file from the input file paths.
    *
    * @param basePath          Base path of the table.
-   * @param baseFileExtension Base file extension, e.g., ".parquet".
    * @param allPaths          A {@link List} of file paths to tag.
    * @return A {@link Map} of instant time in {@link String} to a {@link List} of relative file paths.
    */
   public static Map<String, List<String>> tagInstantsOfBaseAndLogFiles(
-      String basePath, String baseFileExtension, List<Path> allPaths) {
+      String basePath, List<Path> allPaths) {
     // Instant time -> Set of base and log file paths
     Map<String, List<String>> instantToFilesMap = new HashMap<>();
     allPaths.forEach(path -> {
-      String instantTime = path.toString().endsWith(baseFileExtension)
-          ? FSUtils.getCommitTime(path.getName()) : FSUtils.getBaseCommitTimeFromLogPath(path);
+      String instantTime = FSUtils.getCommitTime(path.getName());
       instantToFilesMap.computeIfAbsent(instantTime, k -> new ArrayList<>());
       instantToFilesMap.get(instantTime).add(
           FSUtils.getRelativePartitionPath(new Path(basePath), path));
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java
new file mode 100644
index 0000000..0c4a971
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.hudi.common.table.log.HoodieLogFormat.DEFAULT_WRITE_TOKEN;
+
+public class HoodieTestCommitGenerator {
+  public static final String BASE_FILE_WRITE_TOKEN = "1-0-1";
+  public static final String LOG_FILE_WRITE_TOKEN = DEFAULT_WRITE_TOKEN;
+  private static final Logger LOG = LogManager.getLogger(HoodieTestCommitGenerator.class);
+
+  public static void initCommitInfoForRepairTests(
+      Map<String, List<Pair<String, String>>> baseFileInfo,
+      Map<String, List<Pair<String, String>>> logFileInfo) {
+    baseFileInfo.clear();
+    logFileInfo.clear();
+    baseFileInfo.put("000", CollectionUtils.createImmutableList(
+        new ImmutablePair<>("2022/01/01", UUID.randomUUID().toString()),
+        new ImmutablePair<>("2022/01/02", UUID.randomUUID().toString()),
+        new ImmutablePair<>("2022/01/03", UUID.randomUUID().toString())
+    ));
+    baseFileInfo.put("001", CollectionUtils.createImmutableList(
+        new ImmutablePair<>("2022/01/04", UUID.randomUUID().toString()),
+        new ImmutablePair<>("2022/01/05", UUID.randomUUID().toString())
+    ));
+    baseFileInfo.put("002", CollectionUtils.createImmutableList(
+        new ImmutablePair<>("2022/01/06", UUID.randomUUID().toString())
+    ));
+    logFileInfo.put("001", CollectionUtils.createImmutableList(
+        new ImmutablePair<>("2022/01/03", UUID.randomUUID().toString()),
+        new ImmutablePair<>("2022/01/06", UUID.randomUUID().toString())
+    ));
+  }
+
+  public static void setupTimelineInFS(
+      String basePath,
+      Map<String, List<Pair<String, String>>> baseFileInfo,
+      Map<String, List<Pair<String, String>>> logFileInfo,
+      Map<String, Map<String, List<Pair<String, String>>>> instantInfoMap) throws IOException {
+    instantInfoMap.clear();
+    for (String instantTime : baseFileInfo.keySet()) {
+      Map<String, List<Pair<String, String>>> partitionPathToFileIdAndNameMap = new HashMap<>();
+      baseFileInfo.getOrDefault(instantTime, new ArrayList<>())
+          .forEach(e -> {
+            List<Pair<String, String>> fileInfoList = partitionPathToFileIdAndNameMap
+                .computeIfAbsent(e.getKey(), k -> new ArrayList<>());
+            String fileId = e.getValue();
+            fileInfoList.add(new ImmutablePair<>(fileId, getBaseFilename(instantTime, fileId)));
+          });
+      logFileInfo.getOrDefault(instantTime, new ArrayList<>())
+          .forEach(e -> {
+            List<Pair<String, String>> fileInfoList = partitionPathToFileIdAndNameMap
+                .computeIfAbsent(e.getKey(), k -> new ArrayList<>());
+            String fileId = e.getValue();
+            fileInfoList.add(new ImmutablePair<>(fileId, getLogFilename(instantTime, fileId)));
+          });
+      createCommitAndDataFiles(basePath, instantTime, partitionPathToFileIdAndNameMap);
+      instantInfoMap.put(instantTime, partitionPathToFileIdAndNameMap);
+    }
+  }
+
+  public static String getBaseFilename(String instantTime, String fileId) {
+    return FSUtils.makeDataFileName(instantTime, BASE_FILE_WRITE_TOKEN, fileId);
+  }
+
+  public static String getLogFilename(String instantTime, String fileId) {
+    return FSUtils.makeLogFileName(
+        fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), instantTime, 1, LOG_FILE_WRITE_TOKEN);
+  }
+
+  public static void createCommitAndDataFiles(
+      String basePath, String instantTime,
+      Map<String, List<Pair<String, String>>> partitionPathToFileIdAndNameMap) throws IOException {
+    String commitFilename = HoodieTimeline.makeCommitFileName(instantTime);
+    HoodieCommitMetadata commitMetadata =
+        generateCommitMetadata(partitionPathToFileIdAndNameMap, Collections.emptyMap());
+    String content = commitMetadata.toJsonString();
+    createCommitFileWithMetadata(basePath, new Configuration(), commitFilename, content);
+    for (String partitionPath : partitionPathToFileIdAndNameMap.keySet()) {
+      partitionPathToFileIdAndNameMap.get(partitionPath)
+          .forEach(fileInfo -> {
+            String filename = fileInfo.getValue();
+            try {
+              createDataFile(basePath, new Configuration(), partitionPath, filename);
+            } catch (IOException e) {
+              LOG.error(String.format("Failed to create data file: %s/%s/%s",
+                  basePath, partitionPath, filename));
+            }
+          });
+    }
+  }
+
+  public static HoodieCommitMetadata generateCommitMetadata(
+      Map<String, List<Pair<String, String>>> partitionPathToFileIdAndNameMap,
+      Map<String, String> extraMetadata) {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    for (Map.Entry<String, String> entry : extraMetadata.entrySet()) {
+      metadata.addMetadata(entry.getKey(), entry.getValue());
+    }
+    partitionPathToFileIdAndNameMap.forEach((partitionPath, fileInfoList) ->
+        fileInfoList.forEach(fileInfo -> {
+          HoodieWriteStat writeStat = new HoodieWriteStat();
+          writeStat.setPartitionPath(partitionPath);
+          writeStat.setPath(new Path(partitionPath, fileInfo.getValue()).toString());
+          writeStat.setFileId(fileInfo.getKey());
+          // Below are dummy values
+          writeStat.setTotalWriteBytes(10000);
+          writeStat.setPrevCommit("000");
+          writeStat.setNumWrites(10);
+          writeStat.setNumUpdateWrites(15);
+          writeStat.setTotalLogBlocks(2);
+          writeStat.setTotalLogRecords(100);
+          metadata.addWriteStat(partitionPath, writeStat);
+        }));
+    return metadata;
+  }
+
+  public static void createCommitFileWithMetadata(
+      String basePath, Configuration configuration,
+      String filename, String content) throws IOException {
+    Path commitFilePath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + filename);
+    try (FSDataOutputStream os = FSUtils.getFs(basePath, configuration).create(commitFilePath, true)) {
+      os.writeBytes(new String(content.getBytes(StandardCharsets.UTF_8)));
+    }
+  }
+
+  public static void createDataFile(
+      String basePath, Configuration configuration,
+      String partitionPath, String filename) throws IOException {
+    FileSystem fs = FSUtils.getFs(basePath, configuration);
+    Path filePath = new Path(new Path(basePath, partitionPath), filename);
+    Path parent = filePath.getParent();
+    if (!fs.exists(parent)) {
+      fs.mkdirs(parent);
+    }
+    if (!fs.exists(filePath)) {
+      fs.create(filePath);
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/repair/TestRepairUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/repair/TestRepairUtils.java
new file mode 100644
index 0000000..4f8fb1d
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/repair/TestRepairUtils.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.repair;
+
+import org.apache.hudi.HoodieTestCommitGenerator;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename;
+import static org.apache.hudi.HoodieTestCommitGenerator.getLogFilename;
+import static org.apache.hudi.HoodieTestCommitGenerator.initCommitInfoForRepairTests;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestRepairUtils {
+  // Instant time -> List<Pair<relativePartitionPath, fileId>>
+  private static final Map<String, List<Pair<String, String>>> BASE_FILE_INFO = new HashMap<>();
+  private static final Map<String, List<Pair<String, String>>> LOG_FILE_INFO = new HashMap<>();
+  // instant time -> partitionPathToFileIdAndPathMap
+  private final Map<String, Map<String, List<Pair<String, String>>>> instantInfoMap = new HashMap<>();
+  @TempDir
+  public static java.nio.file.Path tempDir;
+  private static String basePath;
+  private static HoodieTableMetaClient metaClient;
+
+  @BeforeAll
+  static void initFileInfo() throws IOException {
+    initCommitInfoForRepairTests(BASE_FILE_INFO, LOG_FILE_INFO);
+    metaClient =
+        HoodieTestUtils.init(tempDir.toAbsolutePath().toString(), HoodieTableType.COPY_ON_WRITE);
+    basePath = metaClient.getBasePath();
+  }
+
+  public void setupTimelineInFS() throws IOException {
+    HoodieTestCommitGenerator.setupTimelineInFS(
+        basePath, BASE_FILE_INFO, LOG_FILE_INFO, instantInfoMap);
+  }
+
+  @Test
+  public void testTagInstantsOfBaseAndLogFiles() {
+    Map<String, List<String>> expectedResult = new HashMap<>();
+    List<Path> inputPathList = new ArrayList<>();
+
+    for (Map.Entry<String, List<Pair<String, String>>> entry : BASE_FILE_INFO.entrySet()) {
+      String instantTime = entry.getKey();
+      List<String> fileNameList = entry.getValue().stream()
+          .map(e -> {
+            String partitionPath = e.getKey();
+            String fileId = e.getValue();
+            return new Path(
+                new Path(partitionPath), getBaseFilename(instantTime, fileId)).toString();
+          })
+          .collect(Collectors.toList());
+      List<String> expectedList = expectedResult.computeIfAbsent(
+          instantTime, k -> new ArrayList<>());
+      expectedList.addAll(fileNameList);
+      inputPathList.addAll(fileNameList.stream()
+          .map(path -> new Path(basePath, path)).collect(Collectors.toList()));
+    }
+
+    for (Map.Entry<String, List<Pair<String, String>>> entry : LOG_FILE_INFO.entrySet()) {
+      String instantTime = entry.getKey();
+      List<String> fileNameList = entry.getValue().stream()
+          .map(e -> {
+            String partitionPath = e.getKey();
+            String fileId = e.getValue();
+            return new Path(
+                new Path(partitionPath), getLogFilename(instantTime, fileId)).toString();
+          })
+          .collect(Collectors.toList());
+      List<String> expectedList = expectedResult.computeIfAbsent(
+          instantTime, k -> new ArrayList<>());
+      expectedList.addAll(fileNameList);
+      inputPathList.addAll(fileNameList.stream()
+          .map(path -> new Path(basePath, path)).collect(Collectors.toList()));
+    }
+
+    assertEquals(expectedResult,
+        RepairUtils.tagInstantsOfBaseAndLogFiles(basePath, inputPathList));
+  }
+
+  @Test
+  public void testGetBaseAndLogFilePathsFromTimeline() throws IOException {
+    setupTimelineInFS();
+    HoodieTimeline timeline = metaClient.getActiveTimeline();
+    HoodieInstant commitInstant = new HoodieInstant(
+        HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001");
+    HoodieInstant inflightInstant = new HoodieInstant(
+        HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "005");
+    HoodieInstant compactionInstant = new HoodieInstant(
+        HoodieInstant.State.COMPLETED, HoodieTimeline.COMPACTION_ACTION, "006");
+
+    Map<String, List<Pair<String, String>>> partitionToFileIdAndNameMap =
+        instantInfoMap.get(commitInstant.getTimestamp());
+    Set<String> expectedPaths = partitionToFileIdAndNameMap.entrySet().stream()
+        .flatMap(entry ->
+            entry.getValue().stream()
+                .map(fileInfo -> new Path(entry.getKey(), fileInfo.getValue()).toString())
+                .collect(Collectors.toList())
+                .stream()
+        ).collect(Collectors.toSet());
+    assertEquals(Option.of(expectedPaths),
+        RepairUtils.getBaseAndLogFilePathsFromTimeline(timeline, commitInstant));
+    assertThrows(HoodieException.class,
+        () -> RepairUtils.getBaseAndLogFilePathsFromTimeline(timeline, inflightInstant));
+    assertEquals(Option.empty(),
+        RepairUtils.getBaseAndLogFilePathsFromTimeline(timeline, compactionInstant));
+  }
+
+  @Test
+  public void testFindInstantFilesToRemove() throws IOException {
+    setupTimelineInFS();
+    HoodieInstant existingInstant = new HoodieInstant(
+        HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001");
+    Map<String, List<Pair<String, String>>> partitionToFileIdAndNameMap =
+        instantInfoMap.get(existingInstant.getTimestamp());
+    List<String> fileListFromFs = partitionToFileIdAndNameMap.entrySet().stream()
+        .flatMap(entry ->
+            entry.getValue().stream()
+                .map(fileInfo -> new Path(entry.getKey(), fileInfo.getValue()).toString())
+                .collect(Collectors.toList())
+                .stream()
+        ).collect(Collectors.toList());
+    String danglingFilePath = new Path("2022/01/02",
+        getBaseFilename(existingInstant.getTimestamp(), UUID.randomUUID().toString())).toString();
+    fileListFromFs.add(danglingFilePath);
+    // Existing instant
+    assertEquals(CollectionUtils.createImmutableList(danglingFilePath),
+        RepairUtils.findInstantFilesToRemove(
+            existingInstant.getTimestamp(), fileListFromFs,
+            metaClient.getActiveTimeline(), metaClient.getArchivedTimeline()));
+    // Non-existing instant
+    assertEquals(fileListFromFs,
+        RepairUtils.findInstantFilesToRemove(
+            "004", fileListFromFs,
+            metaClient.getActiveTimeline(), metaClient.getArchivedTimeline()));
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 74b673d..ceec282 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -721,6 +721,53 @@ public class FSUtils {
     }
   }
 
+  /**
+   * Lists file status at a certain level in the directory hierarchy.
+   * <p>
+   * E.g., given "/tmp/hoodie_table" as the rootPath, and 3 as the expected level,
+   * this method gives back the {@link FileStatus} of all files under
+   * "/tmp/hoodie_table/[*]/[*]/[*]/" folders.
+   *
+   * @param hoodieEngineContext {@link HoodieEngineContext} instance.
+   * @param fs                  {@link FileSystem} instance.
+   * @param rootPath            Root path for the file listing.
+   * @param expectLevel         Expected level of directory hierarchy for files to be added.
+   * @param parallelism         Parallelism for the file listing.
+   * @return A list of file status of files at the level.
+   */
+
+  public static List<FileStatus> getFileStatusAtLevel(
+      HoodieEngineContext hoodieEngineContext, FileSystem fs, Path rootPath,
+      int expectLevel, int parallelism) {
+    List<String> levelPaths = new ArrayList<>();
+    List<FileStatus> result = new ArrayList<>();
+    levelPaths.add(rootPath.toString());
+
+    for (int i = 0; i <= expectLevel; i++) {
+      result = FSUtils.parallelizeFilesProcess(hoodieEngineContext, fs, parallelism,
+          pairOfSubPathAndConf -> {
+            Path path = new Path(pairOfSubPathAndConf.getKey());
+            try {
+              FileSystem fileSystem = path.getFileSystem(pairOfSubPathAndConf.getValue().get());
+              return Arrays.stream(fileSystem.listStatus(path))
+                .collect(Collectors.toList());
+            } catch (IOException e) {
+              throw new HoodieIOException("Failed to list " + path, e);
+            }
+          },
+          levelPaths)
+          .values().stream()
+          .flatMap(list -> list.stream()).collect(Collectors.toList());
+      if (i < expectLevel) {
+        levelPaths = result.stream()
+            .filter(FileStatus::isDirectory)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .collect(Collectors.toList());
+      }
+    }
+    return result;
+  }
+
   public interface SerializableFunction<T, R> extends Function<T, R>, Serializable {
   }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index ec4c3b2..02258e7 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -67,7 +68,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
   private final long minRollbackToKeep = 10;
   private final long minCleanToKeep = 10;
 
-  private static String TEST_WRITE_TOKEN = "1-0-1";
+  private static final String TEST_WRITE_TOKEN = "1-0-1";
   private static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
 
   @Rule
@@ -456,4 +457,21 @@ public class TestFSUtils extends HoodieCommonTestHarness {
       }
     }
   }
+
+  @Test
+  public void testGetFileStatusAtLevel() throws IOException {
+    String rootDir = basePath + "/.hoodie/.temp";
+    FileSystem fileSystem = metaClient.getFs();
+    prepareTestDirectory(fileSystem, rootDir);
+    List<FileStatus> fileStatusList = FSUtils.getFileStatusAtLevel(
+        new HoodieLocalEngineContext(fileSystem.getConf()), fileSystem,
+        new Path(basePath), 3, 2);
+    assertEquals(CollectionUtils.createImmutableList(
+            "file:" + basePath + "/.hoodie/.temp/subdir1/file1.txt",
+            "file:" + basePath + "/.hoodie/.temp/subdir2/file2.txt"),
+        fileStatusList.stream()
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .filter(filePath -> filePath.endsWith(".txt"))
+            .collect(Collectors.toList()));
+  }
 }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
index d7fa708..d6b74c8 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
@@ -22,21 +22,22 @@ package org.apache.hudi.utilities;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.table.repair.RepairUtils;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
@@ -48,14 +49,10 @@ import java.io.Serializable;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 
-import scala.Tuple2;
-
 /**
  * A tool with spark-submit to repair Hudi table by finding and deleting dangling
  * base and log files.
@@ -153,15 +150,15 @@ public class HoodieRepairTool {
   // Properties with source, hoodie client, key generator etc.
   private TypedProperties props;
   // Spark context
-  private final JavaSparkContext jsc;
+  private final HoodieEngineContext context;
   private final HoodieTableMetaClient metaClient;
-  private final FileSystemBackedTableMetadata tableMetadata;
+  private final HoodieTableMetadata tableMetadata;
 
   public HoodieRepairTool(JavaSparkContext jsc, Config cfg) {
     if (cfg.propsFilePath != null) {
       cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString();
     }
-    this.jsc = jsc;
+    this.context = new HoodieSparkEngineContext(jsc);
     this.cfg = cfg;
     this.props = cfg.propsFilePath == null
         ? UtilHelpers.buildProperties(cfg.configs)
@@ -170,13 +167,12 @@ public class HoodieRepairTool {
         .setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath)
         .setLoadActiveTimelineOnLoad(true)
         .build();
+
     this.tableMetadata = new FileSystemBackedTableMetadata(
-        new HoodieSparkEngineContext(jsc),
-        new SerializableConfiguration(jsc.hadoopConfiguration()),
-        cfg.basePath, cfg.assumeDatePartitioning);
+        context, context.getHadoopConf(), cfg.basePath, cfg.assumeDatePartitioning);
   }
 
-  public void run() {
+  public boolean run() {
     Option<String> startingInstantOption = Option.ofNullable(cfg.startingInstantTime);
     Option<String> endingInstantOption = Option.ofNullable(cfg.endingInstantTime);
 
@@ -201,24 +197,22 @@ public class HoodieRepairTool {
               + "not belonging to any commit are going to be DELETED from the table ******");
           if (checkBackupPathForRepair() < 0) {
             LOG.error("Backup path check failed.");
-            break;
+            return false;
           }
-          doRepair(startingInstantOption, endingInstantOption, false);
-          break;
+          return doRepair(startingInstantOption, endingInstantOption, false);
         case DRY_RUN:
           LOG.info(" ****** The repair tool is in DRY_RUN mode, "
               + "only LOOKING FOR dangling data and log files from the table ******");
-          doRepair(startingInstantOption, endingInstantOption, true);
-          break;
+          return doRepair(startingInstantOption, endingInstantOption, true);
         case UNDO:
           if (checkBackupPathAgainstBasePath() < 0) {
             LOG.error("Backup path check failed.");
-            break;
+            return false;
           }
-          undoRepair();
-          break;
+          return undoRepair();
         default:
           LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly");
+          return false;
       }
     } catch (IOException e) {
       throw new HoodieIOException("Unable to repair table in " + cfg.basePath, e);
@@ -246,69 +240,98 @@ public class HoodieRepairTool {
    * Copies the list of files from source base path to destination base path.
    * The destination file path (base + relative) should not already exist.
    *
-   * @param jsc               {@link JavaSparkContext} instance.
+   * @param context           {@link HoodieEngineContext} instance.
    * @param relativeFilePaths A {@link List} of relative file paths for copying.
    * @param sourceBasePath    Source base path.
    * @param destBasePath      Destination base path.
-   * @param parallelism       Parallelism.
    * @return {@code true} if all successful; {@code false} otherwise.
    */
   static boolean copyFiles(
-      JavaSparkContext jsc, List<String> relativeFilePaths, String sourceBasePath,
-      String destBasePath, int parallelism) {
-    SerializableConfiguration conf = new SerializableConfiguration(jsc.hadoopConfiguration());
-    List<Boolean> allResults = jsc.parallelize(relativeFilePaths, parallelism)
+      HoodieEngineContext context, List<String> relativeFilePaths, String sourceBasePath,
+      String destBasePath) {
+    SerializableConfiguration conf = context.getHadoopConf();
+    List<Boolean> allResults = context.parallelize(relativeFilePaths)
         .mapPartitions(iterator -> {
           List<Boolean> results = new ArrayList<>();
           FileSystem fs = FSUtils.getFs(destBasePath, conf.get());
           iterator.forEachRemaining(filePath -> {
             boolean success = false;
+            Path sourcePath = new Path(sourceBasePath, filePath);
             Path destPath = new Path(destBasePath, filePath);
             try {
               if (!fs.exists(destPath)) {
-                FileIOUtils.copy(fs, new Path(sourceBasePath, filePath), destPath);
+                FileIOUtils.copy(fs, sourcePath, destPath);
                 success = true;
               }
             } catch (IOException e) {
               // Copy Fail
+              LOG.error(String.format("Copying file fails: source [%s], destination [%s]",
+                  sourcePath, destPath));
             } finally {
               results.add(success);
             }
           });
           return results.iterator();
-        })
-        .collect();
+        }, true)
+        .collectAsList();
     return allResults.stream().reduce((r1, r2) -> r1 && r2).orElse(false);
   }
 
   /**
    * Lists all Hoodie files from the table base path.
    *
-   * @param basePathStr Table base path.
-   * @param conf        {@link Configuration} instance.
-   * @return An array of {@link FileStatus} of all Hoodie files.
+   * @param context       {@link HoodieEngineContext} instance.
+   * @param basePathStr   Table base path.
+   * @param expectedLevel Expected level in the directory hierarchy to include the file status.
+   * @param parallelism   Parallelism for the file listing.
+   * @return A list of absolute file paths of all Hoodie files.
    * @throws IOException upon errors.
    */
-  static FileStatus[] listFilesFromBasePath(String basePathStr, Configuration conf) throws IOException {
-    final Set<String> validFileExtensions = Arrays.stream(HoodieFileFormat.values())
-        .map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
-    final String logFileExtension = HoodieFileFormat.HOODIE_LOG.getFileExtension();
-    FileSystem fs = FSUtils.getFs(basePathStr, conf);
+  static List<String> listFilesFromBasePath(
+      HoodieEngineContext context, String basePathStr, int expectedLevel, int parallelism) {
+    FileSystem fs = FSUtils.getFs(basePathStr, context.getHadoopConf().get());
     Path basePath = new Path(basePathStr);
+    return FSUtils.getFileStatusAtLevel(
+            context, fs, basePath, expectedLevel, parallelism).stream()
+        .filter(fileStatus -> {
+          if (!fileStatus.isFile()) {
+            return false;
+          }
+          return FSUtils.isDataFile(fileStatus.getPath());
+        })
+        .map(fileStatus -> fileStatus.getPath().toString())
+        .collect(Collectors.toList());
+  }
 
-    try {
-      return Arrays.stream(fs.listStatus(basePath, path -> {
-        String extension = FSUtils.getFileExtension(path.getName());
-        return validFileExtensions.contains(extension) || path.getName().contains(logFileExtension);
-      })).filter(FileStatus::isFile).toArray(FileStatus[]::new);
-    } catch (IOException e) {
-      // return empty FileStatus if partition does not exist already
-      if (!fs.exists(basePath)) {
-        return new FileStatus[0];
-      } else {
-        throw e;
-      }
-    }
+  /**
+   * Deletes files from table base path.
+   *
+   * @param context           {@link HoodieEngineContext} instance.
+   * @param basePath          Base path of the table.
+   * @param relativeFilePaths A {@link List} of relative file paths for deleting.
+   */
+  static boolean deleteFiles(
+      HoodieEngineContext context, String basePath, List<String> relativeFilePaths) {
+    SerializableConfiguration conf = context.getHadoopConf();
+    return context.parallelize(relativeFilePaths)
+        .mapPartitions(iterator -> {
+          FileSystem fs = FSUtils.getFs(basePath, conf.get());
+          List<Boolean> results = new ArrayList<>();
+          iterator.forEachRemaining(relativeFilePath -> {
+            boolean success = false;
+            try {
+              success = fs.delete(new Path(basePath, relativeFilePath), false);
+            } catch (IOException e) {
+              LOG.warn("Failed to delete file " + relativeFilePath);
+            } finally {
+              results.add(success);
+            }
+          });
+          return results.iterator();
+        }, true)
+        .collectAsList()
+        .stream().reduce((a, b) -> a && b)
+        .orElse(true);
   }
 
   /**
@@ -319,15 +342,14 @@ public class HoodieRepairTool {
    * @param isDryRun              Is dry run.
    * @throws IOException upon errors.
    */
-  void doRepair(
+  boolean doRepair(
       Option<String> startingInstantOption, Option<String> endingInstantOption, boolean isDryRun) throws IOException {
     // Scans all partitions to find base and log files in the base path
     List<Path> allFilesInPartitions = getBaseAndLogFilePathsFromFileSystem();
     // Buckets the files based on instant time
     // instant time -> relative paths of base and log files to base path
     Map<String, List<String>> instantToFilesMap = RepairUtils.tagInstantsOfBaseAndLogFiles(
-        metaClient.getBasePath(),
-        metaClient.getTableConfig().getBaseFileFormat().getFileExtension(), allFilesInPartitions);
+        metaClient.getBasePath(), allFilesInPartitions);
     List<String> instantTimesToRepair = instantToFilesMap.keySet().stream()
         .filter(instant -> (!startingInstantOption.isPresent()
             || instant.compareTo(startingInstantOption.get()) >= 0)
@@ -340,30 +362,30 @@ public class HoodieRepairTool {
     // This assumes that the archived timeline only has completed instants so this is safe
     archivedTimeline.loadCompletedInstantDetailsInMemory();
 
-    int parallelism = Math.max(Math.min(instantTimesToRepair.size(), cfg.parallelism), 1);
-    List<Tuple2<String, List<String>>> instantFilesToRemove =
-        jsc.parallelize(instantTimesToRepair, parallelism)
-            .mapToPair(instantToRepair ->
-                new Tuple2<>(instantToRepair, RepairUtils.findInstantFilesToRemove(instantToRepair,
+    List<ImmutablePair<String, List<String>>> instantFilesToRemove =
+        context.parallelize(instantTimesToRepair)
+            .map(instantToRepair ->
+                new ImmutablePair<>(instantToRepair, RepairUtils.findInstantFilesToRemove(instantToRepair,
                     instantToFilesMap.get(instantToRepair), activeTimeline, archivedTimeline)))
-            .collect();
+            .collectAsList();
 
-    List<Tuple2<String, List<String>>> instantsWithDanglingFiles =
-        instantFilesToRemove.stream().filter(e -> !e._2.isEmpty()).collect(Collectors.toList());
+    List<ImmutablePair<String, List<String>>> instantsWithDanglingFiles =
+        instantFilesToRemove.stream().filter(e -> !e.getValue().isEmpty()).collect(Collectors.toList());
     printRepairInfo(instantTimesToRepair, instantsWithDanglingFiles);
     if (!isDryRun) {
-      List<String> relativeFilePathsToDelete =
-          instantsWithDanglingFiles.stream().flatMap(e -> e._2.stream()).collect(Collectors.toList());
+      List<String> relativeFilePathsToDelete = instantsWithDanglingFiles.stream()
+          .flatMap(e -> e.getValue().stream())
+          .collect(Collectors.toList());
       if (relativeFilePathsToDelete.size() > 0) {
-        parallelism = Math.max(Math.min(relativeFilePathsToDelete.size(), cfg.parallelism), 1);
-        if (!backupFiles(relativeFilePathsToDelete, parallelism)) {
+        if (!backupFiles(relativeFilePathsToDelete)) {
           LOG.error("Error backing up dangling files. Exiting...");
-          return;
+          return false;
         }
-        deleteFiles(relativeFilePathsToDelete, parallelism);
+        return deleteFiles(context, cfg.basePath, relativeFilePathsToDelete);
       }
       LOG.info(String.format("Table repair on %s is successful", cfg.basePath));
     }
+    return true;
   }
 
   /**
@@ -387,22 +409,38 @@ public class HoodieRepairTool {
    *
    * @throws IOException upon errors.
    */
-  void undoRepair() throws IOException {
+  boolean undoRepair() throws IOException {
     FileSystem fs = metaClient.getFs();
     String backupPathStr = cfg.backupPath;
     Path backupPath = new Path(backupPathStr);
     if (!fs.exists(backupPath)) {
       LOG.error("Cannot find backup path: " + backupPath);
-      return;
+      return false;
     }
 
-    List<String> relativeFilePaths = Arrays.stream(
-            listFilesFromBasePath(backupPathStr, jsc.hadoopConfiguration()))
-        .map(fileStatus ->
-            FSUtils.getPartitionPath(backupPathStr, fileStatus.getPath().toString()).toString())
+    List<String> allPartitionPaths = tableMetadata.getAllPartitionPaths();
+
+    if (allPartitionPaths.isEmpty()) {
+      LOG.error("Cannot get one partition path since there is no partition available");
+      return false;
+    }
+
+    int partitionLevels = getExpectedLevelBasedOnPartitionPath(allPartitionPaths.get(0));
+
+    List<String> relativeFilePaths = listFilesFromBasePath(
+        context, backupPathStr, partitionLevels, cfg.parallelism).stream()
+        .map(filePath ->
+            FSUtils.getRelativePartitionPath(new Path(backupPathStr), new Path(filePath)))
         .collect(Collectors.toList());
-    int parallelism = Math.max(Math.min(relativeFilePaths.size(), cfg.parallelism), 1);
-    restoreFiles(relativeFilePaths, parallelism);
+    return restoreFiles(relativeFilePaths);
+  }
+
+  int getExpectedLevelBasedOnPartitionPath(String partitionPath) {
+    if (StringUtils.isNullOrEmpty(partitionPath)) {
+      return 0;
+    }
+    String[] partitionParts = partitionPath.split("/");
+    return partitionParts.length;
   }
 
   /**
@@ -455,48 +493,20 @@ public class HoodieRepairTool {
    * Backs up dangling files from table base path to backup path.
    *
    * @param relativeFilePaths A {@link List} of relative file paths for backup.
-   * @param parallelism       Parallelism for copying.
    * @return {@code true} if all successful; {@code false} otherwise.
    */
-  boolean backupFiles(List<String> relativeFilePaths, int parallelism) {
-    return copyFiles(jsc, relativeFilePaths, cfg.basePath, cfg.backupPath, parallelism);
+  boolean backupFiles(List<String> relativeFilePaths) {
+    return copyFiles(context, relativeFilePaths, cfg.basePath, cfg.backupPath);
   }
 
   /**
    * Restores dangling files from backup path to table base path.
    *
    * @param relativeFilePaths A {@link List} of relative file paths for restoring.
-   * @param parallelism       Parallelism for copying.
    * @return {@code true} if all successful; {@code false} otherwise.
    */
-  boolean restoreFiles(List<String> relativeFilePaths, int parallelism) {
-    return copyFiles(jsc, relativeFilePaths, cfg.backupPath, cfg.basePath, parallelism);
-  }
-
-  /**
-   * Deletes files from table base path.
-   *
-   * @param relativeFilePaths A {@link List} of relative file paths for deleting.
-   * @param parallelism       Parallelism for deleting.
-   */
-  void deleteFiles(List<String> relativeFilePaths, int parallelism) {
-    jsc.parallelize(relativeFilePaths, parallelism)
-        .mapPartitions(iterator -> {
-          FileSystem fs = metaClient.getFs();
-          List<Boolean> results = new ArrayList<>();
-          iterator.forEachRemaining(filePath -> {
-            boolean success = false;
-            try {
-              success = fs.delete(new Path(filePath), false);
-            } catch (IOException e) {
-              LOG.warn("Failed to delete file " + filePath);
-            } finally {
-              results.add(success);
-            }
-          });
-          return results.iterator();
-        })
-        .collect();
+  boolean restoreFiles(List<String> relativeFilePaths) {
+    return copyFiles(context, relativeFilePaths, cfg.backupPath, cfg.basePath);
   }
 
   /**
@@ -506,17 +516,14 @@ public class HoodieRepairTool {
    * @param instantsWithDanglingFiles A list of instants with dangling files.
    */
   private void printRepairInfo(
-      List<String> instantTimesToRepair, List<Tuple2<String, List<String>>> instantsWithDanglingFiles) {
+      List<String> instantTimesToRepair, List<ImmutablePair<String, List<String>>> instantsWithDanglingFiles) {
     int numInstantsToRepair = instantsWithDanglingFiles.size();
     LOG.warn("Number of instants verified based on the base and log files: "
         + instantTimesToRepair.size());
     LOG.warn("Instant timestamps: " + instantTimesToRepair);
     LOG.warn("Number of instants to repair: " + numInstantsToRepair);
     if (numInstantsToRepair > 0) {
-      instantsWithDanglingFiles.forEach(e -> {
-        LOG.warn(" -> Instant " + numInstantsToRepair);
-        LOG.warn("   ** Removing files: " + e._2);
-      });
+      instantsWithDanglingFiles.forEach(e -> LOG.warn("   ** Removing files: " + e.getValue()));
     }
   }
 
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
new file mode 100644
index 0000000..8d3917f
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.HoodieTestCommitGenerator;
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.testutils.providers.SparkProvider;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.provider.Arguments;
+
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename;
+import static org.apache.hudi.HoodieTestCommitGenerator.getLogFilename;
+import static org.apache.hudi.HoodieTestCommitGenerator.initCommitInfoForRepairTests;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieRepairTool extends HoodieCommonTestHarness implements SparkProvider {
+  private static final Logger LOG = LogManager.getLogger(TestHoodieRepairTool.class);
+  // Instant time -> List<Pair<relativePartitionPath, fileId>>
+  private static final Map<String, List<Pair<String, String>>> BASE_FILE_INFO = new HashMap<>();
+  private static final Map<String, List<Pair<String, String>>> LOG_FILE_INFO = new HashMap<>();
+  // Relative paths to base path for dangling files
+  private static final List<String> DANGLING_DATA_FILE_LIST = new ArrayList<>();
+  private static transient SparkSession spark;
+  private static transient SQLContext sqlContext;
+  private static transient JavaSparkContext jsc;
+  private static transient HoodieSparkEngineContext context;
+  // instant time -> partitionPathToFileIdAndNameMap
+  private final Map<String, Map<String, List<Pair<String, String>>>> instantInfoMap = new HashMap<>();
+  private final List<String> allFileAbsolutePathList = new ArrayList<>();
+  private java.nio.file.Path backupTempDir;
+
+  @BeforeAll
+  static void initFileInfo() {
+    initCommitInfoForRepairTests(BASE_FILE_INFO, LOG_FILE_INFO);
+    initDanglingDataFileList();
+  }
+
+  @BeforeEach
+  public void initWithCleanState() throws IOException {
+    boolean initialized = spark != null;
+    if (!initialized) {
+      SparkConf sparkConf = conf();
+      SparkRDDWriteClient.registerClasses(sparkConf);
+      HoodieReadClient.addHoodieSupport(sparkConf);
+      spark = SparkSession.builder().config(sparkConf).getOrCreate();
+      sqlContext = spark.sqlContext();
+      jsc = new JavaSparkContext(spark.sparkContext());
+      context = new HoodieSparkEngineContext(jsc);
+    }
+    initPath();
+    metaClient = HoodieTestUtils.init(basePath, getTableType());
+    backupTempDir = tempDir.resolve("backup");
+    cleanUpDanglingDataFilesInFS();
+    cleanUpBackupTempDir();
+    HoodieTestCommitGenerator.setupTimelineInFS(
+        basePath, BASE_FILE_INFO, LOG_FILE_INFO, instantInfoMap);
+    allFileAbsolutePathList.clear();
+    allFileAbsolutePathList.addAll(instantInfoMap.entrySet().stream()
+        .flatMap(e -> e.getValue().entrySet().stream()
+            .flatMap(partition -> partition.getValue().stream()
+                .map(fileInfo -> new Path(
+                    new Path(basePath, partition.getKey()), fileInfo.getValue()).toString())
+                .collect(Collectors.toList())
+                .stream())
+            .collect(Collectors.toList())
+            .stream()
+        )
+        .collect(Collectors.toList()));
+  }
+
+  @AfterEach
+  public void cleanUp() throws IOException {
+    cleanUpDanglingDataFilesInFS();
+    cleanUpBackupTempDir();
+  }
+
+  @AfterAll
+  public static synchronized void resetSpark() {
+    if (spark != null) {
+      spark.close();
+      spark = null;
+    }
+  }
+
+  private void cleanUpDanglingDataFilesInFS() {
+    FileSystem fs = metaClient.getFs();
+    DANGLING_DATA_FILE_LIST.forEach(
+        relativeFilePath -> {
+          Path path = new Path(basePath, relativeFilePath);
+          try {
+            if (fs.exists(path)) {
+              fs.delete(path, false);
+            }
+          } catch (IOException e) {
+            throw new HoodieIOException("Unable to delete file: " + path);
+          }
+        }
+    );
+  }
+
+  private void cleanUpBackupTempDir() throws IOException {
+    FileSystem fs = metaClient.getFs();
+    fs.delete(new Path(backupTempDir.toAbsolutePath().toString()), true);
+  }
+
+  private static void initDanglingDataFileList() {
+    DANGLING_DATA_FILE_LIST.add(
+        new Path("2022/01/01",
+            getBaseFilename("000", UUID.randomUUID().toString())).toString());
+    DANGLING_DATA_FILE_LIST.add(
+        new Path("2022/01/06",
+            getLogFilename("001", UUID.randomUUID().toString())).toString());
+  }
+
+  private Stream<Arguments> configPathParams() {
+    Object[][] data = new Object[][] {
+        {null, basePath, -1}, {basePath + "/backup", basePath, -1},
+        {"/tmp/backup", basePath, 0}
+    };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  @Test
+  public void testCheckBackupPathAgainstBasePath() {
+    configPathParams().forEach(arguments -> {
+      Object[] args = arguments.get();
+      String backupPath = (String) args[0];
+      String basePath = (String) args[1];
+      int expectedResult = (Integer) args[2];
+
+      HoodieRepairTool.Config config = new HoodieRepairTool.Config();
+      config.backupPath = backupPath;
+      config.basePath = basePath;
+      HoodieRepairTool tool = new HoodieRepairTool(jsc, config);
+      assertEquals(expectedResult, tool.checkBackupPathAgainstBasePath());
+    });
+  }
+
+  private Stream<Arguments> configPathParamsWithFS() throws IOException {
+    SecureRandom random = new SecureRandom();
+    long randomLong = random.nextLong();
+    String emptyBackupPath = "/tmp/empty_backup_" + randomLong;
+    FSUtils.createPathIfNotExists(metaClient.getFs(), new Path(emptyBackupPath));
+    String nonEmptyBackupPath = "/tmp/nonempty_backup_" + randomLong;
+    FSUtils.createPathIfNotExists(metaClient.getFs(), new Path(nonEmptyBackupPath));
+    FSUtils.createPathIfNotExists(metaClient.getFs(), new Path(nonEmptyBackupPath, ".hoodie"));
+    Object[][] data = new Object[][] {
+        {null, basePath, 0}, {"/tmp/backup", basePath, 0},
+        {emptyBackupPath, basePath, 0}, {basePath + "/backup", basePath, -1},
+        {nonEmptyBackupPath, basePath, -1},
+    };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  @Test
+  public void testCheckBackupPathForRepair() throws IOException {
+    for (Arguments arguments: configPathParamsWithFS().collect(Collectors.toList())) {
+      Object[] args = arguments.get();
+      String backupPath = (String) args[0];
+      String basePath = (String) args[1];
+      int expectedResult = (Integer) args[2];
+
+      HoodieRepairTool.Config config = new HoodieRepairTool.Config();
+      config.backupPath = backupPath;
+      config.basePath = basePath;
+      HoodieRepairTool tool = new HoodieRepairTool(jsc, config);
+      assertEquals(expectedResult, tool.checkBackupPathForRepair());
+      if (backupPath == null) {
+        // Backup path should be created if not provided
+        assertNotNull(config.backupPath);
+      }
+    }
+  }
+
+  @Test
+  public void testRepairWithIntactInstants() throws IOException {
+
+    testRepairToolWithMode(
+        Option.empty(), Option.empty(), HoodieRepairTool.Mode.REPAIR.toString(),
+        backupTempDir.toAbsolutePath().toString(), true,
+        allFileAbsolutePathList, Collections.emptyList());
+  }
+
+  @Test
+  public void testRepairWithBrokenInstants() throws IOException {
+    List<String> tableDanglingFileList = createDanglingDataFilesInFS(basePath);
+    String backupPath = backupTempDir.toAbsolutePath().toString();
+    List<String> backupDanglingFileList = DANGLING_DATA_FILE_LIST.stream()
+        .map(filePath -> new Path(backupPath, filePath).toString())
+        .collect(Collectors.toList());
+    List<String> existingFileList = new ArrayList<>(allFileAbsolutePathList);
+    existingFileList.addAll(backupDanglingFileList);
+
+    testRepairToolWithMode(
+        Option.empty(), Option.empty(), HoodieRepairTool.Mode.REPAIR.toString(),
+        backupPath, true,
+        existingFileList, tableDanglingFileList);
+  }
+
+  @Test
+  public void testRepairWithOneBrokenInstant() throws IOException {
+    List<String> tableDanglingFileList = createDanglingDataFilesInFS(basePath);
+    String backupPath = backupTempDir.toAbsolutePath().toString();
+    List<String> backupDanglingFileList = DANGLING_DATA_FILE_LIST
+        .subList(1, 2).stream()
+        .map(filePath -> new Path(backupPath, filePath).toString())
+        .collect(Collectors.toList());
+    List<String> existingFileList = new ArrayList<>(allFileAbsolutePathList);
+    existingFileList.addAll(backupDanglingFileList);
+    existingFileList.addAll(tableDanglingFileList.subList(0, 1));
+
+    testRepairToolWithMode(
+        Option.of("001"), Option.empty(), HoodieRepairTool.Mode.REPAIR.toString(),
+        backupPath, true,
+        existingFileList, tableDanglingFileList.subList(1, 2));
+  }
+
+  @Test
+  public void testDryRunWithBrokenInstants() throws IOException {
+    List<String> tableDanglingFileList = createDanglingDataFilesInFS(basePath);
+    String backupPath = backupTempDir.toAbsolutePath().toString();
+    List<String> backupDanglingFileList = DANGLING_DATA_FILE_LIST.stream()
+        .map(filePath -> new Path(backupPath, filePath).toString())
+        .collect(Collectors.toList());
+    List<String> existingFileList = new ArrayList<>(allFileAbsolutePathList);
+    existingFileList.addAll(tableDanglingFileList);
+
+    testRepairToolWithMode(
+        Option.empty(), Option.empty(), HoodieRepairTool.Mode.DRY_RUN.toString(),
+        backupPath, true,
+        existingFileList, backupDanglingFileList);
+  }
+
+  @Test
+  public void testDryRunWithOneBrokenInstant() throws IOException {
+    List<String> tableDanglingFileList = createDanglingDataFilesInFS(basePath);
+    String backupPath = backupTempDir.toAbsolutePath().toString();
+    List<String> backupDanglingFileList = DANGLING_DATA_FILE_LIST.stream()
+        .map(filePath -> new Path(backupPath, filePath).toString())
+        .collect(Collectors.toList());
+    List<String> existingFileList = new ArrayList<>(allFileAbsolutePathList);
+    existingFileList.addAll(tableDanglingFileList);
+
+    testRepairToolWithMode(
+        Option.of("001"), Option.empty(), HoodieRepairTool.Mode.DRY_RUN.toString(),
+        backupPath, true,
+        existingFileList, backupDanglingFileList);
+  }
+
+  @Test
+  public void testUndoWithNonExistentBackupPath() throws IOException {
+    String backupPath = backupTempDir.toAbsolutePath().toString();
+    metaClient.getFs().delete(new Path(backupPath), true);
+
+    testRepairToolWithMode(
+        Option.empty(), Option.empty(), HoodieRepairTool.Mode.UNDO.toString(),
+        backupPath, false,
+        allFileAbsolutePathList, Collections.emptyList());
+  }
+
+  @Test
+  public void testUndoWithExistingBackupPath() throws IOException {
+    String backupPath = backupTempDir.toAbsolutePath().toString();
+    List<String> backupDanglingFileList = createDanglingDataFilesInFS(backupPath);
+    List<String> restoreDanglingFileList = DANGLING_DATA_FILE_LIST.stream()
+        .map(filePath -> new Path(basePath, filePath).toString())
+        .collect(Collectors.toList());
+    List<String> existingFileList = new ArrayList<>(allFileAbsolutePathList);
+    existingFileList.addAll(backupDanglingFileList);
+    existingFileList.addAll(restoreDanglingFileList);
+
+    verifyFilesInFS(allFileAbsolutePathList, restoreDanglingFileList);
+    verifyFilesInFS(backupDanglingFileList, Collections.emptyList());
+    testRepairToolWithMode(
+        Option.empty(), Option.empty(), HoodieRepairTool.Mode.UNDO.toString(),
+        backupPath, true,
+        existingFileList, Collections.emptyList());
+    // Second run should fail
+    testRepairToolWithMode(
+        Option.empty(), Option.empty(), HoodieRepairTool.Mode.UNDO.toString(),
+        backupPath, false,
+        existingFileList, Collections.emptyList());
+  }
+
+  private void testRepairToolWithMode(
+      Option<String> startingInstantOption, Option<String> endingInstantOption,
+      String runningMode, String backupPath, boolean isRunSuccessful,
+      List<String> existFilePathList, List<String> nonExistFilePathList) throws IOException {
+    HoodieRepairTool.Config config = new HoodieRepairTool.Config();
+    config.backupPath = backupPath;
+    config.basePath = basePath;
+    config.assumeDatePartitioning = true;
+    if (startingInstantOption.isPresent()) {
+      config.startingInstantTime = startingInstantOption.get();
+    }
+    if (endingInstantOption.isPresent()) {
+      config.endingInstantTime = endingInstantOption.get();
+    }
+    config.runningMode = runningMode;
+    HoodieRepairTool tool = new HoodieRepairTool(jsc, config);
+    assertEquals(isRunSuccessful, tool.run());
+    verifyFilesInFS(existFilePathList, nonExistFilePathList);
+  }
+
+  private void verifyFilesInFS(
+      List<String> existFilePathList, List<String> nonExistFilePathList) throws IOException {
+    FileSystem fs = metaClient.getFs();
+
+    for (String filePath : existFilePathList) {
+      assertTrue(fs.exists(new Path(filePath)),
+          String.format("File %s should exist but it's not in the file system", filePath));
+    }
+
+    for (String filePath : nonExistFilePathList) {
+      assertFalse(fs.exists(new Path(filePath)),
+          String.format("File %s should not exist but it's in the file system", filePath));
+    }
+  }
+
+  private List<String> createDanglingDataFilesInFS(String parentPath) {
+    FileSystem fs = metaClient.getFs();
+    return DANGLING_DATA_FILE_LIST.stream().map(relativeFilePath -> {
+      Path path = new Path(parentPath, relativeFilePath);
+      try {
+        fs.mkdirs(path.getParent());
+        if (!fs.exists(path)) {
+          fs.create(path, false);
+        }
+      } catch (IOException e) {
+        LOG.error("Error creating file: " + path);
+      }
+      return path.toString();
+    })
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public HoodieEngineContext context() {
+    return context;
+  }
+
+  @Override
+  public SparkSession spark() {
+    return spark;
+  }
+
+  @Override
+  public SQLContext sqlContext() {
+    return sqlContext;
+  }
+
+  @Override
+  public JavaSparkContext jsc() {
+    return jsc;
+  }
+}