You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/02/20 20:31:55 UTC
[hudi] branch master updated: [HUDI-2648] Retry FileSystem action instead of failed directly. (#3887)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 359fbfd [HUDI-2648] Retry FileSystem action instead of failed directly. (#3887)
359fbfd is described below
commit 359fbfde798b50edc06ee1d0520efcd971a289bc
Author: YueZhang <69...@users.noreply.github.com>
AuthorDate: Mon Feb 21 04:31:31 2022 +0800
[HUDI-2648] Retry FileSystem action instead of failed directly. (#3887)
Co-authored-by: yuezhang <yu...@freewheel.tv>
---
.../org/apache/hudi/cli/commands/SparkMain.java | 3 +-
.../org/apache/hudi/client/BaseHoodieClient.java | 3 +-
.../org/apache/hudi/config/HoodieWriteConfig.java | 7 +
.../org/apache/hudi/table/HoodieFlinkTable.java | 3 +-
.../org/apache/hudi/table/HoodieSparkTable.java | 3 +-
.../hudi/common/fs/FileSystemRetryConfig.java | 142 ++++++++++++
.../common/fs/HoodieRetryWrapperFileSystem.java | 257 +++++++++++++++++++++
.../hudi/common/table/HoodieTableMetaClient.java | 33 ++-
.../org/apache/hudi/common/util/RetryHelper.java | 129 +++++++++++
.../org/apache/hudi/common/fs/TestFSUtils.java | 4 +-
.../fs/TestFSUtilsWithRetryWrapperEnable.java | 210 +++++++++++++++++
.../table/timeline/TestHoodieActiveTimeline.java | 1 +
12 files changed, 786 insertions(+), 9 deletions(-)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 1b6d10b..bb7c2d8 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -470,7 +470,8 @@ public class SparkMain {
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
- .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
+ .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
+ .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
try {
new UpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.valueOf(toVersion), null);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index 8ea61d9..3f208a0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -134,7 +134,8 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
return HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(loadActiveTimelineOnLoad).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
- .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
+ .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
+ .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
}
public Option<EmbeddedTimelineService> getTimelineServer() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index e3efe41..3797efa 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.fs.FileSystemRetryConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -447,6 +448,7 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Master control to disable all table services including archive, clean, compact, cluster, etc.");
private ConsistencyGuardConfig consistencyGuardConfig;
+ private FileSystemRetryConfig fileSystemRetryConfig;
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
// We keep track of original config and rewritten config
@@ -840,6 +842,7 @@ public class HoodieWriteConfig extends HoodieConfig {
newProps.putAll(props);
this.engineType = engineType;
this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
+ this.fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().fromProperties(newProps).build();
this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
this.viewStorageConfig = clientSpecifiedViewStorageConfig;
this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
@@ -1725,6 +1728,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return consistencyGuardConfig;
}
+ public FileSystemRetryConfig getFileSystemRetryConfig() {
+ return fileSystemRetryConfig;
+ }
+
public void setConsistencyGuardConfig(ConsistencyGuardConfig consistencyGuardConfig) {
this.consistencyGuardConfig = consistencyGuardConfig;
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 164b00e..2f08a55 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -53,7 +53,8 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
- .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
+ .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
+ .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
return HoodieFlinkTable.create(config, context, metaClient);
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index 35c9ab3..118438c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -63,7 +63,8 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
- .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
+ .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
+ .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FileSystemRetryConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FileSystemRetryConfig.java
new file mode 100644
index 0000000..c7f99ec
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FileSystemRetryConfig.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.fs;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * The file system retry relevant config options.
+ */
+@ConfigClassProperty(name = "FileSystem Guard Configurations",
+ groupName = ConfigGroups.Names.WRITE_CLIENT,
+ description = "The filesystem retry related config options, to help deal with runtime exception like list/get/put/delete performance issues.")
+public class FileSystemRetryConfig extends HoodieConfig {
+
+ public static final ConfigProperty<String> FILESYSTEM_RETRY_ENABLE = ConfigProperty
+ .key("hoodie.filesystem.operation.retry.enable")
+ .defaultValue("false")
+ .sinceVersion("0.11.0")
+ .withDocumentation("Enabled to handle list/get/delete etc file system performance issue.");
+
+ public static final ConfigProperty<Long> INITIAL_RETRY_INTERVAL_MS = ConfigProperty
+ .key("hoodie.filesystem.operation.retry.initial_interval_ms")
+ .defaultValue(100L)
+ .sinceVersion("0.11.0")
+ .withDocumentation("Amount of time (in ms) to wait, before retry to do operations on storage.");
+
+ public static final ConfigProperty<Long> MAX_RETRY_INTERVAL_MS = ConfigProperty
+ .key("hoodie.filesystem.operation.retry.max_interval_ms")
+ .defaultValue(2000L)
+ .sinceVersion("0.11.0")
+ .withDocumentation("Maximum amount of time (in ms), to wait for next retry.");
+
+ public static final ConfigProperty<Integer> MAX_RETRY_NUMBERS = ConfigProperty
+ .key("hoodie.filesystem.operation.retry.max_numbers")
+ .defaultValue(4)
+ .sinceVersion("0.11.0")
+ .withDocumentation("Maximum number of retry actions to perform, with exponential backoff.");
+
+ public static final ConfigProperty<String> RETRY_EXCEPTIONS = ConfigProperty
+ .key("hoodie.filesystem.operation.retry.exceptions")
+ .defaultValue("")
+ .sinceVersion("0.11.0")
+ .withDocumentation("The class name of the Exception that needs to be re-tryed, separated by commas. "
+ + "Default is empty which means retry all the IOException and RuntimeException from FileSystem");
+
+ private FileSystemRetryConfig() {
+ super();
+ }
+
+ public long getInitialRetryIntervalMs() {
+ return getLong(INITIAL_RETRY_INTERVAL_MS);
+ }
+
+ public long getMaxRetryIntervalMs() {
+ return getLong(MAX_RETRY_INTERVAL_MS);
+ }
+
+ public int getMaxRetryNumbers() {
+ return getInt(MAX_RETRY_NUMBERS);
+ }
+
+ public boolean isFileSystemActionRetryEnable() {
+ return Boolean.parseBoolean(getStringOrDefault(FILESYSTEM_RETRY_ENABLE));
+ }
+
+ public static FileSystemRetryConfig.Builder newBuilder() {
+ return new Builder();
+ }
+
+ public String getRetryExceptions() {
+ return getString(RETRY_EXCEPTIONS);
+ }
+
+ /**
+ * The builder used to build filesystem configurations.
+ */
+ public static class Builder {
+
+ private final FileSystemRetryConfig fileSystemRetryConfig = new FileSystemRetryConfig();
+
+ public Builder fromFile(File propertiesFile) throws IOException {
+ try (FileReader reader = new FileReader(propertiesFile)) {
+ fileSystemRetryConfig.getProps().load(reader);
+ return this;
+ }
+ }
+
+ public Builder fromProperties(Properties props) {
+ this.fileSystemRetryConfig.getProps().putAll(props);
+ return this;
+ }
+
+ public Builder withMaxRetryNumbers(int numbers) {
+ fileSystemRetryConfig.setValue(MAX_RETRY_NUMBERS, String.valueOf(numbers));
+ return this;
+ }
+
+ public Builder withInitialRetryIntervalMs(long intervalMs) {
+ fileSystemRetryConfig.setValue(INITIAL_RETRY_INTERVAL_MS, String.valueOf(intervalMs));
+ return this;
+ }
+
+ public Builder withMaxRetryIntervalMs(long intervalMs) {
+ fileSystemRetryConfig.setValue(MAX_RETRY_INTERVAL_MS, String.valueOf(intervalMs));
+ return this;
+ }
+
+ public Builder withFileSystemActionRetryEnabled(boolean enabled) {
+ fileSystemRetryConfig.setValue(FILESYSTEM_RETRY_ENABLE, String.valueOf(enabled));
+ return this;
+ }
+
+ public FileSystemRetryConfig build() {
+ fileSystemRetryConfig.setDefaults(FileSystemRetryConfig.class.getName());
+ return fileSystemRetryConfig;
+ }
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieRetryWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieRetryWrapperFileSystem.java
new file mode 100644
index 0000000..075f811
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieRetryWrapperFileSystem.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hudi.common.util.RetryHelper;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.EnumSet;
+
+public class HoodieRetryWrapperFileSystem extends FileSystem {
+
+ private FileSystem fileSystem;
+ private long maxRetryIntervalMs;
+ private int maxRetryNumbers;
+ private long initialRetryIntervalMs;
+ private String retryExceptionsList;
+
+ public HoodieRetryWrapperFileSystem(FileSystem fs, long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) {
+ this.fileSystem = fs;
+ this.maxRetryIntervalMs = maxRetryIntervalMs;
+ this.maxRetryNumbers = maxRetryNumbers;
+ this.initialRetryIntervalMs = initialRetryIntervalMs;
+ this.retryExceptionsList = retryExceptions;
+
+ }
+
+ @Override
+ public URI getUri() {
+ return fileSystem.getUri();
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return (FSDataInputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.open(f, bufferSize)).start();
+ }
+
+ @Override
+ public FSDataInputStream open(Path f) throws IOException {
+ return (FSDataInputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.open(f)).start();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f,
+ FsPermission permission,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress) throws IOException {
+ return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+ .tryWith(() -> fileSystem.create(f, permission, overwrite, bufferSize, replication, blockSize, progress)).start();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
+ return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, overwrite)).start();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f) throws IOException {
+ return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f)).start();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, Progressable progress) throws IOException {
+ return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, progress)).start();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, short replication) throws IOException {
+ return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, replication)).start();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, short replication, Progressable progress) throws IOException {
+ return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, replication, progress)).start();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException {
+ return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+ .tryWith(() -> fileSystem.create(f, overwrite, bufferSize)).start();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress)
+ throws IOException {
+ return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+ .tryWith(() -> fileSystem.create(f, overwrite, bufferSize, progress)).start();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+ .tryWith(() -> fileSystem.create(f, overwrite, bufferSize, replication, blockSize, progress)).start();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize,
+ short replication, long blockSize, Progressable progress) throws IOException {
+ return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+ .tryWith(() -> fileSystem.create(f, permission, flags, bufferSize, replication, blockSize, progress)).start();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize,
+ short replication, long blockSize, Progressable progress, Options.ChecksumOpt checksumOpt) throws IOException {
+ return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+ .tryWith(() -> fileSystem.create(f, permission, flags, bufferSize, replication,
+ blockSize, progress, checksumOpt)).start();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)
+ throws IOException {
+ return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+ .tryWith(() -> fileSystem.create(f, overwrite, bufferSize, replication, blockSize)).start();
+ }
+
+ @Override
+ public boolean createNewFile(Path f) throws IOException {
+ return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.createNewFile(f)).start();
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+ return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.append(f, bufferSize, progress)).start();
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f) throws IOException {
+ return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.append(f)).start();
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
+ return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.append(f, bufferSize)).start();
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.rename(src, dst)).start();
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.delete(f, recursive)).start();
+ }
+
+ @Override
+ public boolean delete(Path f) throws IOException {
+ return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.delete(f, true)).start();
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+ return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(f)).start();
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException {
+ return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(f, filter)).start();
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path[] files) throws IOException {
+ return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(files)).start();
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException {
+ return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(files, filter)).start();
+ }
+
+ @Override
+ public FileStatus[] globStatus(Path pathPattern) throws IOException {
+ return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.globStatus(pathPattern)).start();
+ }
+
+ @Override
+ public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException {
+ return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.globStatus(pathPattern, filter)).start();
+ }
+
+ @Override
+ public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) throws IOException {
+ return (RemoteIterator<LocatedFileStatus>) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listLocatedStatus(f)).start();
+ }
+
+ @Override
+ public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws IOException {
+ return (RemoteIterator<LocatedFileStatus>) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
+ .tryWith(() -> fileSystem.listFiles(f, recursive)).start();
+ }
+
+ @Override
+ public void setWorkingDirectory(Path newDir) {
+ fileSystem.setWorkingDirectory(newDir);
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return fileSystem.getWorkingDirectory();
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.mkdirs(f, permission)).start();
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ return (FileStatus) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.getFileStatus(f)).start();
+ }
+
+ @Override
+ public boolean exists(Path f) throws IOException {
+ return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.exists(f)).start();
+ }
+
+ @Override
+ public Configuration getConf() {
+ return fileSystem.getConf();
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 5ad3b32..740d569 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -23,6 +23,8 @@ import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
+import org.apache.hudi.common.fs.FileSystemRetryConfig;
+import org.apache.hudi.common.fs.HoodieRetryWrapperFileSystem;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.fs.NoOpConsistencyGuard;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -100,12 +102,14 @@ public class HoodieTableMetaClient implements Serializable {
private HoodieActiveTimeline activeTimeline;
private HoodieArchivedTimeline archivedTimeline;
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
+ private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build();
private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
- String payloadClassName) {
+ String payloadClassName, FileSystemRetryConfig fileSystemRetryConfig) {
LOG.info("Loading HoodieTableMetaClient from " + basePath);
this.consistencyGuardConfig = consistencyGuardConfig;
+ this.fileSystemRetryConfig = fileSystemRetryConfig;
this.hadoopConf = new SerializableConfiguration(conf);
Path basePathDir = new Path(basePath);
this.basePath = basePathDir.toString();
@@ -141,7 +145,8 @@ public class HoodieTableMetaClient implements Serializable {
public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) {
return HoodieTableMetaClient.builder().setConf(oldMetaClient.hadoopConf.get()).setBasePath(oldMetaClient.basePath).setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad)
- .setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null).build();
+ .setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null)
+ .setFileSystemRetryConfig(oldMetaClient.fileSystemRetryConfig).build();
}
/**
@@ -256,6 +261,14 @@ public class HoodieTableMetaClient implements Serializable {
public HoodieWrapperFileSystem getFs() {
if (fs == null) {
FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.newCopy());
+
+ if (fileSystemRetryConfig.isFileSystemActionRetryEnable()) {
+ fileSystem = new HoodieRetryWrapperFileSystem(fileSystem,
+ fileSystemRetryConfig.getMaxRetryIntervalMs(),
+ fileSystemRetryConfig.getMaxRetryNumbers(),
+ fileSystemRetryConfig.getInitialRetryIntervalMs(),
+ fileSystemRetryConfig.getRetryExceptions());
+ }
ValidationUtils.checkArgument(!(fileSystem instanceof HoodieWrapperFileSystem),
"File System not expected to be that of HoodieWrapperFileSystem");
fs = new HoodieWrapperFileSystem(fileSystem,
@@ -266,6 +279,10 @@ public class HoodieTableMetaClient implements Serializable {
return fs;
}
+ public void setFs(HoodieWrapperFileSystem fs) {
+ this.fs = fs;
+ }
+
/**
* Return raw file-system.
*
@@ -305,6 +322,10 @@ public class HoodieTableMetaClient implements Serializable {
return consistencyGuardConfig;
}
+ public FileSystemRetryConfig getFileSystemRetryConfig() {
+ return fileSystemRetryConfig;
+ }
+
/**
* Get the archived commits as a timeline. This is costly operation, as all data from the archived files are read.
* This should not be used, unless for historical debugging purposes.
@@ -578,6 +599,7 @@ public class HoodieTableMetaClient implements Serializable {
private boolean loadActiveTimelineOnLoad = false;
private String payloadClassName = null;
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
+ private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build();
private Option<TimelineLayoutVersion> layoutVersion = Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION);
public Builder setConf(Configuration conf) {
@@ -605,6 +627,11 @@ public class HoodieTableMetaClient implements Serializable {
return this;
}
+ public Builder setFileSystemRetryConfig(FileSystemRetryConfig fileSystemRetryConfig) {
+ this.fileSystemRetryConfig = fileSystemRetryConfig;
+ return this;
+ }
+
public Builder setLayoutVersion(Option<TimelineLayoutVersion> layoutVersion) {
this.layoutVersion = layoutVersion;
return this;
@@ -614,7 +641,7 @@ public class HoodieTableMetaClient implements Serializable {
ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init HoodieTableMetaClient");
ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init HoodieTableMetaClient");
return new HoodieTableMetaClient(conf, basePath,
- loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName);
+ loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig);
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java
new file mode 100644
index 0000000..067c5ee
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+public class RetryHelper<T> {
+ private static final Logger LOG = LogManager.getLogger(RetryHelper.class);
+ private CheckedFunction<T> func;
+ private int num;
+ private long maxIntervalTime;
+ private long initialIntervalTime = 100L;
+ private String taskInfo = "N/A";
+ private List<? extends Class<? extends Exception>> retryExceptionsClasses;
+
+ public RetryHelper() {
+ }
+
+ public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) {
+ this.num = maxRetryNumbers;
+ this.initialIntervalTime = initialRetryIntervalMs;
+ this.maxIntervalTime = maxRetryIntervalMs;
+ if (StringUtils.isNullOrEmpty(retryExceptions)) {
+ this.retryExceptionsClasses = new ArrayList<>();
+ } else {
+ this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
+ .map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
+ .map(Exception::getClass)
+ .collect(Collectors.toList());
+ }
+ }
+
+ public RetryHelper(String taskInfo) {
+ this.taskInfo = taskInfo;
+ }
+
+ public RetryHelper tryWith(CheckedFunction<T> func) {
+ this.func = func;
+ return this;
+ }
+
+ public T start() throws IOException {
+ int retries = 0;
+ T functionResult = null;
+
+ while (true) {
+ long waitTime = Math.min(getWaitTimeExp(retries), maxIntervalTime);
+ try {
+ functionResult = func.get();
+ break;
+ } catch (IOException | RuntimeException e) {
+ if (!checkIfExceptionInRetryList(e)) {
+ throw e;
+ }
+ if (retries++ >= num) {
+ LOG.error("Still failed to " + taskInfo + " after retried " + num + " times.", e);
+ throw e;
+ }
+ LOG.warn("Catch Exception " + taskInfo + ", will retry after " + waitTime + " ms.", e);
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException ex) {
+ // ignore InterruptedException here
+ }
+ }
+ }
+
+ if (retries > 0) {
+ LOG.info("Success to " + taskInfo + " after retried " + retries + " times.");
+ }
+ return functionResult;
+ }
+
+ private boolean checkIfExceptionInRetryList(Exception e) {
+ boolean inRetryList = false;
+
+ // if users didn't set hoodie.filesystem.operation.retry.exceptions
+ // we will retry all the IOException and RuntimeException
+ if (retryExceptionsClasses.isEmpty()) {
+ return true;
+ }
+
+ for (Class<? extends Exception> clazz : retryExceptionsClasses) {
+ if (clazz.isInstance(e)) {
+ inRetryList = true;
+ break;
+ }
+ }
+ return inRetryList;
+ }
+
+ private long getWaitTimeExp(int retryCount) {
+ Random random = new Random();
+ if (0 == retryCount) {
+ return initialIntervalTime;
+ }
+
+ return (long) Math.pow(2, retryCount) * initialIntervalTime + random.nextInt(100);
+ }
+
+ @FunctionalInterface
+ public interface CheckedFunction<T> {
+ T get() throws IOException;
+ }
+}
\ No newline at end of file
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index e4460ce..f51702a 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -68,8 +68,8 @@ public class TestFSUtils extends HoodieCommonTestHarness {
private final long minRollbackToKeep = 10;
private final long minCleanToKeep = 10;
- private static final String TEST_WRITE_TOKEN = "1-0-1";
- private static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
+ private static String TEST_WRITE_TOKEN = "1-0-1";
+ public static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
@Rule
public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
new file mode 100644
index 0000000..0b849eb
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version loop.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-loop.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests file system utils with retry wrapper enable.
+ * P.S extends TestFSUtils and setUp a HoodieWrapperFileSystem for metaClient which can test all the TestFSUtils uts with RetryWrapperEnable
+ */
+public class TestFSUtilsWithRetryWrapperEnable extends TestFSUtils {
+
+ private static final String EXCEPTION_MESSAGE = "Fake runtime exception here.";
+ private long maxRetryIntervalMs;
+ private int maxRetryNumbers;
+ private long initialRetryIntervalMs;
+
+ @Override
+ @BeforeEach
+ public void setUp() throws IOException {
+ initMetaClient();
+ basePath = "file:" + basePath;
+ FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().withFileSystemActionRetryEnabled(true).build();
+ maxRetryIntervalMs = fileSystemRetryConfig.getMaxRetryIntervalMs();
+ maxRetryNumbers = fileSystemRetryConfig.getMaxRetryNumbers();
+ initialRetryIntervalMs = fileSystemRetryConfig.getInitialRetryIntervalMs();
+
+ FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(FSUtils.getFs(metaClient.getMetaPath(), metaClient.getHadoopConf()), 2);
+ FileSystem fileSystem = new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, "");
+
+ HoodieWrapperFileSystem fs = new HoodieWrapperFileSystem(fileSystem, new NoOpConsistencyGuard());
+ metaClient.setFs(fs);
+ }
+
+ // Test the scenario that fs keeps retrying until it fails.
+ @Test
+ public void testProcessFilesWithExceptions() throws Exception {
+ FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(FSUtils.getFs(metaClient.getMetaPath(), metaClient.getHadoopConf()), 100);
+ FileSystem fileSystem = new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, "");
+ HoodieWrapperFileSystem fs = new HoodieWrapperFileSystem(fileSystem, new NoOpConsistencyGuard());
+ metaClient.setFs(fs);
+ List<String> folders =
+ Arrays.asList("2016/04/15", ".hoodie/.temp/2/2016/04/15");
+ folders.forEach(f -> assertThrows(RuntimeException.class, () -> metaClient.getFs().mkdirs(new Path(new Path(basePath), f))));
+ }
+
+ /**
+ * Fake remote FileSystem which will throw RuntimeException something like AmazonS3Exception 503.
+ */
+ class FakeRemoteFileSystem extends FileSystem {
+
+ private FileSystem fs;
+ private int count = 1;
+ private int loop;
+
+ public FakeRemoteFileSystem(FileSystem fs, int retryLoop) {
+ this.fs = fs;
+ this.loop = retryLoop;
+ }
+
+ @Override
+ public URI getUri() {
+ return fs.getUri();
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ if (count % loop == 0) {
+ count++;
+ return fs.open(f, bufferSize);
+ } else {
+ count++;
+ throw new RuntimeException(EXCEPTION_MESSAGE);
+ }
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
+ if (count % loop == 0) {
+ count++;
+ return fs.create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
+ } else {
+ count++;
+ throw new RuntimeException(EXCEPTION_MESSAGE);
+ }
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+ if (count % loop == 0) {
+ count++;
+ return fs.append(f, bufferSize, progress);
+ } else {
+ count++;
+ throw new RuntimeException(EXCEPTION_MESSAGE);
+ }
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ if (count % loop == 0) {
+ count++;
+ return fs.rename(src, dst);
+ } else {
+ count++;
+ throw new RuntimeException(EXCEPTION_MESSAGE);
+ }
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ if (count % loop == 0) {
+ count++;
+ return fs.delete(f, recursive);
+ } else {
+ count++;
+ throw new RuntimeException(EXCEPTION_MESSAGE);
+ }
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+ if (count % loop == 0) {
+ count++;
+ return fs.listStatus(f);
+ } else {
+ count++;
+ throw new RuntimeException(EXCEPTION_MESSAGE);
+ }
+ }
+
+ @Override
+ public void setWorkingDirectory(Path newDir) {
+ fs.setWorkingDirectory(newDir);
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return fs.getWorkingDirectory();
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ if (count % loop == 0) {
+ count++;
+ return fs.mkdirs(f, permission);
+ } else {
+ count++;
+ throw new RuntimeException(EXCEPTION_MESSAGE);
+ }
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ if (count % loop == 0) {
+ count++;
+ return fs.getFileStatus(f);
+ } else {
+ count++;
+ throw new RuntimeException(EXCEPTION_MESSAGE);
+ }
+ }
+
+ @Override
+ public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) throws IOException {
+ return fs.listLocatedStatus(f);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return fs.getConf();
+ }
+ }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index 9d89c2a..576cfd7 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -126,6 +126,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline(
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath())
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(metaClient.getConsistencyGuardConfig())
+ .setFileSystemRetryConfig(metaClient.getFileSystemRetryConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(VERSION_0))).build());
// Old Timeline writes both to aux and timeline folder
oldTimeline.saveToCompactionRequested(instant6, Option.of(dummy));