You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/02/20 20:31:55 UTC

[hudi] branch master updated: [HUDI-2648] Retry FileSystem action instead of failed directly. (#3887)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 359fbfd  [HUDI-2648] Retry FileSystem action instead of failed directly. (#3887)
359fbfd is described below

commit 359fbfde798b50edc06ee1d0520efcd971a289bc
Author: YueZhang <69...@users.noreply.github.com>
AuthorDate: Mon Feb 21 04:31:31 2022 +0800

    [HUDI-2648] Retry FileSystem action instead of failed directly. (#3887)
    
    
    Co-authored-by: yuezhang <yu...@freewheel.tv>
---
 .../org/apache/hudi/cli/commands/SparkMain.java    |   3 +-
 .../org/apache/hudi/client/BaseHoodieClient.java   |   3 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   7 +
 .../org/apache/hudi/table/HoodieFlinkTable.java    |   3 +-
 .../org/apache/hudi/table/HoodieSparkTable.java    |   3 +-
 .../hudi/common/fs/FileSystemRetryConfig.java      | 142 ++++++++++++
 .../common/fs/HoodieRetryWrapperFileSystem.java    | 257 +++++++++++++++++++++
 .../hudi/common/table/HoodieTableMetaClient.java   |  33 ++-
 .../org/apache/hudi/common/util/RetryHelper.java   | 129 +++++++++++
 .../org/apache/hudi/common/fs/TestFSUtils.java     |   4 +-
 .../fs/TestFSUtilsWithRetryWrapperEnable.java      | 210 +++++++++++++++++
 .../table/timeline/TestHoodieActiveTimeline.java   |   1 +
 12 files changed, 786 insertions(+), 9 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 1b6d10b..bb7c2d8 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -470,7 +470,8 @@ public class SparkMain {
     HoodieTableMetaClient metaClient =
         HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath())
             .setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
-            .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
+            .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
+            .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
     try {
       new UpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance())
           .run(HoodieTableVersion.valueOf(toVersion), null);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index 8ea61d9..3f208a0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -134,7 +134,8 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
   protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
     return HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath())
         .setLoadActiveTimelineOnLoad(loadActiveTimelineOnLoad).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
-        .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
+        .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
+        .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
   }
 
   public Option<EmbeddedTimelineService> getTimelineServer() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index e3efe41..3797efa 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.fs.FileSystemRetryConfig;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -447,6 +448,7 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Master control to disable all table services including archive, clean, compact, cluster, etc.");
 
   private ConsistencyGuardConfig consistencyGuardConfig;
+  private FileSystemRetryConfig fileSystemRetryConfig;
 
   // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
   // We keep track of original config and rewritten config
@@ -840,6 +842,7 @@ public class HoodieWriteConfig extends HoodieConfig {
     newProps.putAll(props);
     this.engineType = engineType;
     this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
+    this.fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().fromProperties(newProps).build();
     this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
     this.viewStorageConfig = clientSpecifiedViewStorageConfig;
     this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
@@ -1725,6 +1728,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return consistencyGuardConfig;
   }
 
+  public FileSystemRetryConfig getFileSystemRetryConfig() {
+    return fileSystemRetryConfig;
+  }
+
   public void setConsistencyGuardConfig(ConsistencyGuardConfig consistencyGuardConfig) {
     this.consistencyGuardConfig = consistencyGuardConfig;
   }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 164b00e..2f08a55 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -53,7 +53,8 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
     HoodieTableMetaClient metaClient =
         HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
             .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
-            .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
+            .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
+            .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
     return HoodieFlinkTable.create(config, context, metaClient);
   }
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index 35c9ab3..118438c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -63,7 +63,8 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
     HoodieTableMetaClient metaClient =
         HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
             .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
-            .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
+            .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
+            .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
     return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FileSystemRetryConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FileSystemRetryConfig.java
new file mode 100644
index 0000000..c7f99ec
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FileSystemRetryConfig.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.fs;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * The file system retry relevant config options.
+ */
+@ConfigClassProperty(name = "FileSystem Guard Configurations",
+        groupName = ConfigGroups.Names.WRITE_CLIENT,
+        description = "The filesystem retry related config options, to help deal with runtime exception like list/get/put/delete performance issues.")
+public class FileSystemRetryConfig  extends HoodieConfig {
+
+  public static final ConfigProperty<String> FILESYSTEM_RETRY_ENABLE = ConfigProperty
+      .key("hoodie.filesystem.operation.retry.enable")
+      .defaultValue("false")
+      .sinceVersion("0.11.0")
+      .withDocumentation("Enabled to handle list/get/delete etc file system performance issue.");
+
+  public static final ConfigProperty<Long> INITIAL_RETRY_INTERVAL_MS = ConfigProperty
+      .key("hoodie.filesystem.operation.retry.initial_interval_ms")
+      .defaultValue(100L)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Amount of time (in ms) to wait, before retry to do operations on storage.");
+
+  public static final ConfigProperty<Long> MAX_RETRY_INTERVAL_MS = ConfigProperty
+      .key("hoodie.filesystem.operation.retry.max_interval_ms")
+      .defaultValue(2000L)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Maximum amount of time (in ms), to wait for next retry.");
+
+  public static final ConfigProperty<Integer> MAX_RETRY_NUMBERS = ConfigProperty
+      .key("hoodie.filesystem.operation.retry.max_numbers")
+      .defaultValue(4)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Maximum number of retry actions to perform, with exponential backoff.");
+
+  public static final ConfigProperty<String> RETRY_EXCEPTIONS = ConfigProperty
+      .key("hoodie.filesystem.operation.retry.exceptions")
+      .defaultValue("")
+      .sinceVersion("0.11.0")
+      .withDocumentation("The class name of the Exception that needs to be re-tryed, separated by commas. "
+          + "Default is empty which means retry all the IOException and RuntimeException from FileSystem");
+
+  private FileSystemRetryConfig() {
+    super();
+  }
+
+  public long getInitialRetryIntervalMs() {
+    return getLong(INITIAL_RETRY_INTERVAL_MS);
+  }
+
+  public long getMaxRetryIntervalMs() {
+    return getLong(MAX_RETRY_INTERVAL_MS);
+  }
+
+  public int getMaxRetryNumbers() {
+    return getInt(MAX_RETRY_NUMBERS);
+  }
+
+  public boolean isFileSystemActionRetryEnable() {
+    return Boolean.parseBoolean(getStringOrDefault(FILESYSTEM_RETRY_ENABLE));
+  }
+
+  public static FileSystemRetryConfig.Builder newBuilder() {
+    return new Builder();
+  }
+
+  public String getRetryExceptions() {
+    return getString(RETRY_EXCEPTIONS);
+  }
+
+  /**
+   * The builder used to build filesystem configurations.
+   */
+  public static class Builder {
+
+    private final FileSystemRetryConfig fileSystemRetryConfig = new FileSystemRetryConfig();
+
+    public Builder fromFile(File propertiesFile) throws IOException {
+      try (FileReader reader = new FileReader(propertiesFile)) {
+        fileSystemRetryConfig.getProps().load(reader);
+        return this;
+      }
+    }
+
+    public Builder fromProperties(Properties props) {
+      this.fileSystemRetryConfig.getProps().putAll(props);
+      return this;
+    }
+
+    public Builder withMaxRetryNumbers(int numbers) {
+      fileSystemRetryConfig.setValue(MAX_RETRY_NUMBERS, String.valueOf(numbers));
+      return this;
+    }
+
+    public Builder withInitialRetryIntervalMs(long intervalMs) {
+      fileSystemRetryConfig.setValue(INITIAL_RETRY_INTERVAL_MS, String.valueOf(intervalMs));
+      return this;
+    }
+
+    public Builder withMaxRetryIntervalMs(long intervalMs) {
+      fileSystemRetryConfig.setValue(MAX_RETRY_INTERVAL_MS, String.valueOf(intervalMs));
+      return this;
+    }
+
+    public Builder withFileSystemActionRetryEnabled(boolean enabled) {
+      fileSystemRetryConfig.setValue(FILESYSTEM_RETRY_ENABLE, String.valueOf(enabled));
+      return this;
+    }
+
+    public FileSystemRetryConfig build() {
+      fileSystemRetryConfig.setDefaults(FileSystemRetryConfig.class.getName());
+      return fileSystemRetryConfig;
+    }
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieRetryWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieRetryWrapperFileSystem.java
new file mode 100644
index 0000000..075f811
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieRetryWrapperFileSystem.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hudi.common.util.RetryHelper;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.EnumSet;
+
+public class HoodieRetryWrapperFileSystem extends FileSystem {
+
+  private FileSystem fileSystem;
+  private long maxRetryIntervalMs;
+  private int maxRetryNumbers;
+  private long initialRetryIntervalMs;
+  private String retryExceptionsList;
+
+  public HoodieRetryWrapperFileSystem(FileSystem fs, long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) {
+    this.fileSystem = fs;
+    this.maxRetryIntervalMs = maxRetryIntervalMs;
+    this.maxRetryNumbers = maxRetryNumbers;
+    this.initialRetryIntervalMs = initialRetryIntervalMs;
+    this.retryExceptionsList = retryExceptions;
+
+  }
+
+  @Override
+  public URI getUri() {
+    return fileSystem.getUri();
+  }
+
+  @Override
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    return (FSDataInputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.open(f, bufferSize)).start();
+  }
+
+  @Override
+  public FSDataInputStream open(Path f) throws IOException {
+    return (FSDataInputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.open(f)).start();
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f,
+                                   FsPermission permission,
+                                   boolean overwrite,
+                                   int bufferSize,
+                                   short replication,
+                                   long blockSize,
+                                   Progressable progress) throws IOException {
+    return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+        .tryWith(() -> fileSystem.create(f, permission, overwrite, bufferSize, replication, blockSize, progress)).start();
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
+    return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, overwrite)).start();
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f) throws IOException {
+    return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f)).start();
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, Progressable progress) throws IOException {
+    return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, progress)).start();
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, short replication) throws IOException {
+    return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, replication)).start();
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, short replication, Progressable progress) throws IOException {
+    return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, replication, progress)).start();
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException {
+    return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+        .tryWith(() -> fileSystem.create(f, overwrite, bufferSize)).start();
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress)
+      throws IOException {
+    return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+        .tryWith(() -> fileSystem.create(f, overwrite, bufferSize, progress)).start();
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize,
+                                   Progressable progress) throws IOException {
+    return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+        .tryWith(() -> fileSystem.create(f, overwrite, bufferSize, replication, blockSize, progress)).start();
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize,
+                                   short replication, long blockSize, Progressable progress) throws IOException {
+    return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+        .tryWith(() -> fileSystem.create(f, permission, flags, bufferSize, replication, blockSize, progress)).start();
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize,
+                                   short replication, long blockSize, Progressable progress, Options.ChecksumOpt checksumOpt) throws IOException {
+    return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+        .tryWith(() -> fileSystem.create(f, permission, flags, bufferSize, replication,
+                blockSize, progress, checksumOpt)).start();
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)
+      throws IOException {
+    return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+        .tryWith(() -> fileSystem.create(f, overwrite, bufferSize, replication, blockSize)).start();
+  }
+
+  @Override
+  public boolean createNewFile(Path f) throws IOException {
+    return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.createNewFile(f)).start();
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+    return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.append(f, bufferSize, progress)).start();
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f) throws IOException {
+    return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.append(f)).start();
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
+    return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.append(f, bufferSize)).start();
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.rename(src, dst)).start();
+  }
+
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.delete(f, recursive)).start();
+  }
+
+  @Override
+  public boolean delete(Path f) throws IOException {
+    return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.delete(f, true)).start();
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+    return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(f)).start();
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException {
+    return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(f, filter)).start();
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path[] files) throws IOException {
+    return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(files)).start();
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException {
+    return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(files, filter)).start();
+  }
+
+  @Override
+  public FileStatus[] globStatus(Path pathPattern) throws IOException {
+    return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.globStatus(pathPattern)).start();
+  }
+
+  @Override
+  public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException {
+    return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.globStatus(pathPattern, filter)).start();
+  }
+
+  @Override
+  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) throws IOException {
+    return (RemoteIterator<LocatedFileStatus>) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listLocatedStatus(f)).start();
+  }
+
+  @Override
+  public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws IOException {
+    return (RemoteIterator<LocatedFileStatus>) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+        .tryWith(() -> fileSystem.listFiles(f, recursive)).start();
+  }
+
+  @Override
+  public void setWorkingDirectory(Path newDir) {
+    fileSystem.setWorkingDirectory(newDir);
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return fileSystem.getWorkingDirectory();
+  }
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.mkdirs(f, permission)).start();
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    return (FileStatus) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.getFileStatus(f)).start();
+  }
+
+  @Override
+  public boolean exists(Path f) throws IOException {
+    return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.exists(f)).start();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return fileSystem.getConf();
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 5ad3b32..740d569 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -23,6 +23,8 @@ import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
+import org.apache.hudi.common.fs.FileSystemRetryConfig;
+import org.apache.hudi.common.fs.HoodieRetryWrapperFileSystem;
 import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
 import org.apache.hudi.common.fs.NoOpConsistencyGuard;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -100,12 +102,14 @@ public class HoodieTableMetaClient implements Serializable {
   private HoodieActiveTimeline activeTimeline;
   private HoodieArchivedTimeline archivedTimeline;
   private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
+  private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build();
 
   private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
                                 ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
-                                String payloadClassName) {
+                                String payloadClassName, FileSystemRetryConfig fileSystemRetryConfig) {
     LOG.info("Loading HoodieTableMetaClient from " + basePath);
     this.consistencyGuardConfig = consistencyGuardConfig;
+    this.fileSystemRetryConfig = fileSystemRetryConfig;
     this.hadoopConf = new SerializableConfiguration(conf);
     Path basePathDir = new Path(basePath);
     this.basePath = basePathDir.toString();
@@ -141,7 +145,8 @@ public class HoodieTableMetaClient implements Serializable {
 
   public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) {
     return HoodieTableMetaClient.builder().setConf(oldMetaClient.hadoopConf.get()).setBasePath(oldMetaClient.basePath).setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad)
-        .setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null).build();
+        .setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null)
+        .setFileSystemRetryConfig(oldMetaClient.fileSystemRetryConfig).build();
   }
 
   /**
@@ -256,6 +261,14 @@ public class HoodieTableMetaClient implements Serializable {
   public HoodieWrapperFileSystem getFs() {
     if (fs == null) {
       FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.newCopy());
+
+      if (fileSystemRetryConfig.isFileSystemActionRetryEnable()) {
+        fileSystem = new HoodieRetryWrapperFileSystem(fileSystem,
+            fileSystemRetryConfig.getMaxRetryIntervalMs(),
+            fileSystemRetryConfig.getMaxRetryNumbers(),
+            fileSystemRetryConfig.getInitialRetryIntervalMs(),
+            fileSystemRetryConfig.getRetryExceptions());
+      }
       ValidationUtils.checkArgument(!(fileSystem instanceof HoodieWrapperFileSystem),
           "File System not expected to be that of HoodieWrapperFileSystem");
       fs = new HoodieWrapperFileSystem(fileSystem,
@@ -266,6 +279,10 @@ public class HoodieTableMetaClient implements Serializable {
     return fs;
   }
 
+  public void setFs(HoodieWrapperFileSystem fs) {
+    this.fs = fs;
+  }
+
   /**
    * Return raw file-system.
    * 
@@ -305,6 +322,10 @@ public class HoodieTableMetaClient implements Serializable {
     return consistencyGuardConfig;
   }
 
+  public FileSystemRetryConfig getFileSystemRetryConfig() {
+    return fileSystemRetryConfig;
+  }
+
   /**
    * Get the archived commits as a timeline. This is costly operation, as all data from the archived files are read.
    * This should not be used, unless for historical debugging purposes.
@@ -578,6 +599,7 @@ public class HoodieTableMetaClient implements Serializable {
     private boolean loadActiveTimelineOnLoad = false;
     private String payloadClassName = null;
     private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
+    private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build();
     private Option<TimelineLayoutVersion> layoutVersion = Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION);
 
     public Builder setConf(Configuration conf) {
@@ -605,6 +627,11 @@ public class HoodieTableMetaClient implements Serializable {
       return this;
     }
 
+    public Builder setFileSystemRetryConfig(FileSystemRetryConfig fileSystemRetryConfig) {
+      this.fileSystemRetryConfig = fileSystemRetryConfig;
+      return this;
+    }
+
     public Builder setLayoutVersion(Option<TimelineLayoutVersion> layoutVersion) {
       this.layoutVersion = layoutVersion;
       return this;
@@ -614,7 +641,7 @@ public class HoodieTableMetaClient implements Serializable {
       ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init HoodieTableMetaClient");
       ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init HoodieTableMetaClient");
       return new HoodieTableMetaClient(conf, basePath,
-          loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName);
+          loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig);
     }
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java
new file mode 100644
index 0000000..067c5ee
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+public class RetryHelper<T> {
+  private static final Logger LOG = LogManager.getLogger(RetryHelper.class);
+  private CheckedFunction<T> func;
+  private int num;
+  private long maxIntervalTime;
+  private long initialIntervalTime = 100L;
+  private String taskInfo = "N/A";
+  private List<? extends Class<? extends Exception>> retryExceptionsClasses;
+
+  public RetryHelper() {
+  }
+
+  public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) {
+    this.num = maxRetryNumbers;
+    this.initialIntervalTime = initialRetryIntervalMs;
+    this.maxIntervalTime = maxRetryIntervalMs;
+    if (StringUtils.isNullOrEmpty(retryExceptions)) {
+      this.retryExceptionsClasses = new ArrayList<>();
+    } else {
+      this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
+          .map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
+          .map(Exception::getClass)
+          .collect(Collectors.toList());
+    }
+  }
+
+  public RetryHelper(String taskInfo) {
+    this.taskInfo = taskInfo;
+  }
+
+  public RetryHelper tryWith(CheckedFunction<T> func) {
+    this.func = func;
+    return this;
+  }
+
+  public T start() throws IOException {
+    int retries = 0;
+    T functionResult = null;
+
+    while (true) {
+      long waitTime = Math.min(getWaitTimeExp(retries), maxIntervalTime);
+      try {
+        functionResult = func.get();
+        break;
+      } catch (IOException | RuntimeException e) {
+        if (!checkIfExceptionInRetryList(e)) {
+          throw e;
+        }
+        if (retries++ >= num) {
+          LOG.error("Still failed to " + taskInfo + " after retried " + num + " times.", e);
+          throw e;
+        }
+        LOG.warn("Catch Exception " + taskInfo + ", will retry after " + waitTime + " ms.", e);
+        try {
+          Thread.sleep(waitTime);
+        } catch (InterruptedException ex) {
+            // ignore InterruptedException here
+        }
+      }
+    }
+
+    if (retries > 0) {
+      LOG.info("Success to " + taskInfo + " after retried " + retries + " times.");
+    }
+    return functionResult;
+  }
+
+  private boolean checkIfExceptionInRetryList(Exception e) {
+    boolean inRetryList = false;
+
+    // if users didn't set hoodie.filesystem.operation.retry.exceptions
+    // we will retry all the IOException and RuntimeException
+    if (retryExceptionsClasses.isEmpty()) {
+      return true;
+    }
+
+    for (Class<? extends Exception> clazz : retryExceptionsClasses) {
+      if (clazz.isInstance(e)) {
+        inRetryList = true;
+        break;
+      }
+    }
+    return inRetryList;
+  }
+
+  private long getWaitTimeExp(int retryCount) {
+    Random random = new Random();
+    if (0 == retryCount) {
+      return initialIntervalTime;
+    }
+
+    return (long) Math.pow(2, retryCount) * initialIntervalTime + random.nextInt(100);
+  }
+
+  @FunctionalInterface
+  public interface CheckedFunction<T> {
+    T get() throws IOException;
+  }
+}
\ No newline at end of file
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index e4460ce..f51702a 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -68,8 +68,8 @@ public class TestFSUtils extends HoodieCommonTestHarness {
   private final long minRollbackToKeep = 10;
   private final long minCleanToKeep = 10;
 
-  private static final String TEST_WRITE_TOKEN = "1-0-1";
-  private static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
+  private static String TEST_WRITE_TOKEN = "1-0-1";
+  public static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
 
   @Rule
   public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
new file mode 100644
index 0000000..0b849eb
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version loop.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-loop.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests file system utils with retry wrapper enable.
+ * P.S extends TestFSUtils and setUp a HoodieWrapperFileSystem for metaClient which can test all the TestFSUtils uts with RetryWrapperEnable
+ */
+public class TestFSUtilsWithRetryWrapperEnable extends TestFSUtils {
+
+  private static final String EXCEPTION_MESSAGE = "Fake runtime exception here.";
+  private long maxRetryIntervalMs;
+  private int maxRetryNumbers;
+  private long initialRetryIntervalMs;
+
+  @Override
+  @BeforeEach
+  public void setUp() throws IOException {
+    initMetaClient();
+    basePath = "file:" + basePath;
+    FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().withFileSystemActionRetryEnabled(true).build();
+    maxRetryIntervalMs = fileSystemRetryConfig.getMaxRetryIntervalMs();
+    maxRetryNumbers = fileSystemRetryConfig.getMaxRetryNumbers();
+    initialRetryIntervalMs = fileSystemRetryConfig.getInitialRetryIntervalMs();
+
+    FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(FSUtils.getFs(metaClient.getMetaPath(), metaClient.getHadoopConf()), 2);
+    FileSystem fileSystem = new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, "");
+
+    HoodieWrapperFileSystem fs = new HoodieWrapperFileSystem(fileSystem, new NoOpConsistencyGuard());
+    metaClient.setFs(fs);
+  }
+
+  // Test the scenario that fs keeps retrying until it fails.
+  @Test
+  public void testProcessFilesWithExceptions() throws Exception {
+    FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(FSUtils.getFs(metaClient.getMetaPath(), metaClient.getHadoopConf()), 100);
+    FileSystem fileSystem = new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, "");
+    HoodieWrapperFileSystem fs = new HoodieWrapperFileSystem(fileSystem, new NoOpConsistencyGuard());
+    metaClient.setFs(fs);
+    List<String> folders =
+            Arrays.asList("2016/04/15", ".hoodie/.temp/2/2016/04/15");
+    folders.forEach(f -> assertThrows(RuntimeException.class, () -> metaClient.getFs().mkdirs(new Path(new Path(basePath), f))));
+  }
+
+  /**
+   * Fake remote FileSystem which will throw RuntimeException something like AmazonS3Exception 503.
+   */
+  class FakeRemoteFileSystem extends FileSystem {
+
+    private FileSystem fs;
+    private int count = 1;
+    private int loop;
+
+    public FakeRemoteFileSystem(FileSystem fs, int retryLoop) {
+      this.fs = fs;
+      this.loop = retryLoop;
+    }
+    
+    @Override
+    public URI getUri() {
+      return fs.getUri();
+    }
+
+    @Override
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+      if (count % loop == 0) {
+        count++;
+        return fs.open(f, bufferSize);
+      } else {
+        count++;
+        throw new RuntimeException(EXCEPTION_MESSAGE);
+      }
+    }
+
+    @Override
+    public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
+      if (count % loop == 0) {
+        count++;
+        return fs.create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
+      } else {
+        count++;
+        throw new RuntimeException(EXCEPTION_MESSAGE);
+      }
+    }
+
+    @Override
+    public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+      if (count % loop == 0) {
+        count++;
+        return fs.append(f, bufferSize, progress);
+      } else {
+        count++;
+        throw new RuntimeException(EXCEPTION_MESSAGE);
+      }
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+      if (count % loop == 0) {
+        count++;
+        return fs.rename(src, dst);
+      } else {
+        count++;
+        throw new RuntimeException(EXCEPTION_MESSAGE);
+      }
+    }
+
+    @Override
+    public boolean delete(Path f, boolean recursive) throws IOException {
+      if (count % loop == 0) {
+        count++;
+        return fs.delete(f, recursive);
+      } else {
+        count++;
+        throw new RuntimeException(EXCEPTION_MESSAGE);
+      }
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+      if (count % loop == 0) {
+        count++;
+        return fs.listStatus(f);
+      } else {
+        count++;
+        throw new RuntimeException(EXCEPTION_MESSAGE);
+      }
+    }
+
+    @Override
+    public void setWorkingDirectory(Path newDir) {
+      fs.setWorkingDirectory(newDir);
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+      return fs.getWorkingDirectory();
+    }
+
+    @Override
+    public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+      if (count % loop == 0) {
+        count++;
+        return fs.mkdirs(f, permission);
+      } else {
+        count++;
+        throw new RuntimeException(EXCEPTION_MESSAGE);
+      }
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+      if (count % loop == 0) {
+        count++;
+        return fs.getFileStatus(f);
+      } else {
+        count++;
+        throw new RuntimeException(EXCEPTION_MESSAGE);
+      }
+    }
+
+    @Override
+    public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) throws IOException {
+      return fs.listLocatedStatus(f);
+    }
+
+    @Override
+    public Configuration getConf() {
+      return fs.getConf();
+    }
+  }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index 9d89c2a..576cfd7 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -126,6 +126,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
     HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline(
         HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath())
             .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(metaClient.getConsistencyGuardConfig())
+            .setFileSystemRetryConfig(metaClient.getFileSystemRetryConfig())
             .setLayoutVersion(Option.of(new TimelineLayoutVersion(VERSION_0))).build());
     // Old Timeline writes both to aux and timeline folder
     oldTimeline.saveToCompactionRequested(instant6, Option.of(dummy));