You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/09/06 04:14:20 UTC
[hudi] branch master updated: [HUDI-4619] Add a remote request retry mechanism for 'Remotehoodietablefilesystemview'. (#6393)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2590d2abf1 [HUDI-4619] Add a remote request retry mechanism for 'Remotehoodietablefilesystemview'. (#6393)
2590d2abf1 is described below
commit 2590d2abf13a91eea8d73fb36c069da7efa375b9
Author: HunterXHunter <13...@qq.com>
AuthorDate: Tue Sep 6 12:14:11 2022 +0800
[HUDI-4619] Add a remote request retry mechanism for 'Remotehoodietablefilesystemview'. (#6393)
---
.../client/embedded/EmbeddedTimelineService.java | 5 ++
.../common/table/view/FileSystemViewManager.java | 3 +-
.../table/view/FileSystemViewStorageConfig.java | 76 ++++++++++++++++++++++
.../view/RemoteHoodieTableFileSystemView.java | 45 ++++++++-----
.../org/apache/hudi/common/util/RetryHelper.java | 52 +++++++++------
.../java/org/apache/hudi/util/StreamerUtil.java | 5 ++
.../TestRemoteHoodieTableFileSystemView.java | 46 +++++++++++++
7 files changed, 194 insertions(+), 38 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
index 72f8e29c9f..4d5375894d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
@@ -117,6 +117,11 @@ public class EmbeddedTimelineService {
.withRemoteServerHost(hostAddr)
.withRemoteServerPort(serverPort)
.withRemoteTimelineClientTimeoutSecs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientTimeoutSecs())
+ .withRemoteTimelineClientRetry(writeConfig.getClientSpecifiedViewStorageConfig().isRemoteTimelineClientRetryEnabled())
+ .withRemoteTimelineClientMaxRetryNumbers(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers())
+ .withRemoteTimelineInitialRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineInitialRetryIntervalMs())
+ .withRemoteTimelineClientMaxRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryIntervalMs())
+ .withRemoteTimelineClientRetryExceptions(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientRetryExceptions())
.build();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
index 35fda6c416..48023d5046 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
@@ -214,8 +214,7 @@ public class FileSystemViewManager {
LOG.info("Creating remote view for basePath " + metaClient.getBasePath() + ". Server="
+ viewConf.getRemoteViewServerHost() + ":" + viewConf.getRemoteViewServerPort() + ", Timeout="
+ viewConf.getRemoteTimelineClientTimeoutSecs());
- return new RemoteHoodieTableFileSystemView(viewConf.getRemoteViewServerHost(), viewConf.getRemoteViewServerPort(),
- metaClient, viewConf.getRemoteTimelineClientTimeoutSecs());
+ return new RemoteHoodieTableFileSystemView(metaClient, viewConf);
}
public static FileSystemViewManager createViewManager(final HoodieEngineContext context,
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
index 63f10855ba..92937f61e2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
@@ -110,6 +110,37 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
.defaultValue(5 * 60) // 5 min
.withDocumentation("Timeout in seconds, to wait for API requests against a remote file system view. e.g timeline server.");
+ public static final ConfigProperty<String> REMOTE_RETRY_ENABLE = ConfigProperty
+ .key("hoodie.filesystem.view.remote.retry.enable")
+ .defaultValue("false")
+ .sinceVersion("0.12.1")
+ .withDocumentation("Whether to enable API request retry for remote file system view.");
+
+ public static final ConfigProperty<Integer> REMOTE_MAX_RETRY_NUMBERS = ConfigProperty
+ .key("hoodie.filesystem.view.remote.retry.max_numbers")
+ .defaultValue(3) // 3 times
+ .sinceVersion("0.12.1")
+ .withDocumentation("Maximum number of retry for API requests against a remote file system view. e.g timeline server.");
+
+ public static final ConfigProperty<Long> REMOTE_INITIAL_RETRY_INTERVAL_MS = ConfigProperty
+ .key("hoodie.filesystem.view.remote.retry.initial_interval_ms")
+ .defaultValue(100L)
+ .sinceVersion("0.12.1")
+ .withDocumentation("Amount of time (in ms) to wait, before retry to do operations on storage.");
+
+ public static final ConfigProperty<Long> REMOTE_MAX_RETRY_INTERVAL_MS = ConfigProperty
+ .key("hoodie.filesystem.view.remote.retry.max_interval_ms")
+ .defaultValue(2000L)
+ .sinceVersion("0.12.1")
+ .withDocumentation("Maximum amount of time (in ms), to wait for next retry.");
+
+ public static final ConfigProperty<String> RETRY_EXCEPTIONS = ConfigProperty
+ .key("hoodie.filesystem.view.remote.retry.exceptions")
+ .defaultValue("")
+ .sinceVersion("0.12.1")
+ .withDocumentation("The class name of the Exception that needs to be re-tryed, separated by commas. "
+ + "Default is empty which means retry all the IOException and RuntimeException from Remote Request.");
+
public static final ConfigProperty<String> REMOTE_BACKUP_VIEW_ENABLE = ConfigProperty
.key("hoodie.filesystem.remote.backup.view.enable")
.defaultValue("true") // Need to be disabled only for tests.
@@ -144,6 +175,26 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
return getInt(REMOTE_TIMEOUT_SECS);
}
+ public boolean isRemoteTimelineClientRetryEnabled() {
+ return getBoolean(REMOTE_RETRY_ENABLE);
+ }
+
+ public Integer getRemoteTimelineClientMaxRetryNumbers() {
+ return getInt(REMOTE_MAX_RETRY_NUMBERS);
+ }
+
+ public Long getRemoteTimelineInitialRetryIntervalMs() {
+ return getLong(REMOTE_INITIAL_RETRY_INTERVAL_MS);
+ }
+
+ public Long getRemoteTimelineClientMaxRetryIntervalMs() {
+ return getLong(REMOTE_MAX_RETRY_INTERVAL_MS);
+ }
+
+ public String getRemoteTimelineClientRetryExceptions() {
+ return getString(RETRY_EXCEPTIONS);
+ }
+
public long getMaxMemoryForFileGroupMap() {
long totalMemory = getLong(SPILLABLE_MEMORY);
return totalMemory - getMaxMemoryForPendingCompaction() - getMaxMemoryForBootstrapBaseFile();
@@ -245,6 +296,31 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
return this;
}
+ public Builder withRemoteTimelineClientRetry(boolean enableRetry) {
+ fileSystemViewStorageConfig.setValue(REMOTE_RETRY_ENABLE, Boolean.toString(enableRetry));
+ return this;
+ }
+
+ public Builder withRemoteTimelineClientMaxRetryNumbers(Integer maxRetryNumbers) {
+ fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_NUMBERS, maxRetryNumbers.toString());
+ return this;
+ }
+
+ public Builder withRemoteTimelineInitialRetryIntervalMs(Long initialRetryIntervalMs) {
+ fileSystemViewStorageConfig.setValue(REMOTE_INITIAL_RETRY_INTERVAL_MS, initialRetryIntervalMs.toString());
+ return this;
+ }
+
+ public Builder withRemoteTimelineClientMaxRetryIntervalMs(Long maxRetryIntervalMs) {
+ fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_INTERVAL_MS, maxRetryIntervalMs.toString());
+ return this;
+ }
+
+ public Builder withRemoteTimelineClientRetryExceptions(String retryExceptions) {
+ fileSystemViewStorageConfig.setValue(RETRY_EXCEPTIONS, retryExceptions);
+ return this;
+ }
+
public Builder withMemFractionForPendingCompaction(Double memFractionForPendingCompaction) {
fileSystemViewStorageConfig.setValue(SPILLABLE_COMPACTION_MEM_FRACTION, memFractionForPendingCompaction.toString());
return this;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index 099b79cbba..0b32211a5b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
import org.apache.hudi.common.table.timeline.dto.InstantDTO;
import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.RetryHelper;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -128,26 +129,36 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
private final HoodieTableMetaClient metaClient;
private HoodieTimeline timeline;
private final ObjectMapper mapper;
- private final int timeoutSecs;
+ private final int timeoutMs;
private boolean closed = false;
+ private RetryHelper<Response> retryHelper;
+
private enum RequestMethod {
GET, POST
}
public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient) {
- this(server, port, metaClient, 300);
+ this(metaClient, FileSystemViewStorageConfig.newBuilder().withRemoteServerHost(server).withRemoteServerPort(port).build());
}
- public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient, int timeoutSecs) {
+ public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSystemViewStorageConfig viewConf) {
this.basePath = metaClient.getBasePath();
- this.serverHost = server;
- this.serverPort = port;
this.mapper = new ObjectMapper();
this.metaClient = metaClient;
this.timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
- this.timeoutSecs = timeoutSecs;
+ this.serverHost = viewConf.getRemoteViewServerHost();
+ this.serverPort = viewConf.getRemoteViewServerPort();
+ this.timeoutMs = viewConf.getRemoteTimelineClientTimeoutSecs() * 1000;
+ if (viewConf.isRemoteTimelineClientRetryEnabled()) {
+ retryHelper = new RetryHelper(
+ viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
+ viewConf.getRemoteTimelineClientMaxRetryNumbers(),
+ viewConf.getRemoteTimelineInitialRetryIntervalMs(),
+ viewConf.getRemoteTimelineClientRetryExceptions(),
+ "Sending request");
+ }
}
private <T> T executeRequest(String requestPath, Map<String, String> queryParameters, TypeReference reference,
@@ -165,17 +176,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
String url = builder.toString();
LOG.info("Sending request : (" + url + ")");
- Response response;
- int timeout = this.timeoutSecs * 1000; // msec
- switch (method) {
- case GET:
- response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
- break;
- case POST:
- default:
- response = Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute();
- break;
- }
+ Response response = retryHelper != null ? retryHelper.start(() -> get(timeoutMs, url, method)) : get(timeoutMs, url, method);
String content = response.returnContent().asString();
return (T) mapper.readValue(content, reference);
}
@@ -495,4 +496,14 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
throw new HoodieRemoteException(e);
}
}
+
+ private Response get(int timeoutMs, String url, RequestMethod method) throws IOException {
+ switch (method) {
+ case GET:
+ return Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
+ case POST:
+ default:
+ return Request.Post(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
+ }
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java
index 067c5ee40d..2e82b548f0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java
@@ -18,28 +18,27 @@
package org.apache.hudi.common.util;
+import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
-public class RetryHelper<T> {
+public class RetryHelper<T> implements Serializable {
private static final Logger LOG = LogManager.getLogger(RetryHelper.class);
- private CheckedFunction<T> func;
- private int num;
- private long maxIntervalTime;
- private long initialIntervalTime = 100L;
+ private transient CheckedFunction<T> func;
+ private final int num;
+ private final long maxIntervalTime;
+ private final long initialIntervalTime;
private String taskInfo = "N/A";
private List<? extends Class<? extends Exception>> retryExceptionsClasses;
- public RetryHelper() {
- }
-
public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) {
this.num = maxRetryNumbers;
this.initialIntervalTime = initialRetryIntervalMs;
@@ -47,23 +46,29 @@ public class RetryHelper<T> {
if (StringUtils.isNullOrEmpty(retryExceptions)) {
this.retryExceptionsClasses = new ArrayList<>();
} else {
- this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
- .map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
- .map(Exception::getClass)
- .collect(Collectors.toList());
+ try {
+ this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
+ .map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
+ .map(Exception::getClass)
+ .collect(Collectors.toList());
+ } catch (HoodieException e) {
+ LOG.error("Exception while loading retry exceptions classes '" + retryExceptions + "'.", e);
+ this.retryExceptionsClasses = new ArrayList<>();
+ }
}
}
- public RetryHelper(String taskInfo) {
+ public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions, String taskInfo) {
+ this(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptions);
this.taskInfo = taskInfo;
}
- public RetryHelper tryWith(CheckedFunction<T> func) {
+ public RetryHelper<T> tryWith(CheckedFunction<T> func) {
this.func = func;
return this;
}
- public T start() throws IOException {
+ public T start(CheckedFunction<T> func) throws IOException {
int retries = 0;
T functionResult = null;
@@ -77,14 +82,18 @@ public class RetryHelper<T> {
throw e;
}
if (retries++ >= num) {
- LOG.error("Still failed to " + taskInfo + " after retried " + num + " times.", e);
+ String message = "Still failed to " + taskInfo + " after retried " + num + " times.";
+ LOG.error(message, e);
+ if (e instanceof IOException) {
+ throw new IOException(message, e);
+ }
throw e;
}
- LOG.warn("Catch Exception " + taskInfo + ", will retry after " + waitTime + " ms.", e);
+ LOG.warn("Catch Exception for " + taskInfo + ", will retry after " + waitTime + " ms.", e);
try {
Thread.sleep(waitTime);
} catch (InterruptedException ex) {
- // ignore InterruptedException here
+ // ignore InterruptedException here
}
}
}
@@ -92,9 +101,14 @@ public class RetryHelper<T> {
if (retries > 0) {
LOG.info("Success to " + taskInfo + " after retried " + retries + " times.");
}
+
return functionResult;
}
+ public T start() throws IOException {
+ return start(this.func);
+ }
+
private boolean checkIfExceptionInRetryList(Exception e) {
boolean inRetryList = false;
@@ -123,7 +137,7 @@ public class RetryHelper<T> {
}
@FunctionalInterface
- public interface CheckedFunction<T> {
+ public interface CheckedFunction<T> extends Serializable {
T get() throws IOException;
}
}
\ No newline at end of file
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index b09d7ad8bf..4b93faeaf7 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -429,6 +429,11 @@ public class StreamerUtil {
.withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort())
.withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs())
+ .withRemoteTimelineClientRetry(viewStorageConfig.isRemoteTimelineClientRetryEnabled())
+ .withRemoteTimelineClientMaxRetryNumbers(viewStorageConfig.getRemoteTimelineClientMaxRetryNumbers())
+ .withRemoteTimelineInitialRetryIntervalMs(viewStorageConfig.getRemoteTimelineInitialRetryIntervalMs())
+ .withRemoteTimelineClientMaxRetryIntervalMs(viewStorageConfig.getRemoteTimelineClientMaxRetryIntervalMs())
+ .withRemoteTimelineClientRetryExceptions(viewStorageConfig.getRemoteTimelineClientRetryExceptions())
.build();
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt, conf);
return writeClient;
diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
index f9a6172b5e..127bc51e95 100644
--- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
+++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
@@ -28,12 +28,16 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TestHoodieTableFileSystemView;
+import org.apache.hudi.exception.HoodieRemoteException;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.fail;
/**
* Bring up a remote Timeline Server and run all test-cases of TestHoodieTableFileSystemView against it.
@@ -64,4 +68,46 @@ public class TestRemoteHoodieTableFileSystemView extends TestHoodieTableFileSyst
view = new RemoteHoodieTableFileSystemView("localhost", server.getServerPort(), metaClient);
return view;
}
+
+ @Test
+ public void testRemoteHoodieTableFileSystemViewWithRetry() {
+ // Service is available.
+ view.getLatestBaseFiles();
+ // Shut down the service.
+ server.close();
+ try {
+ // Immediately fails and throws a connection refused exception.
+ view.getLatestBaseFiles();
+ fail("Should be catch Exception 'Connection refused (Connection refused)'");
+ } catch (HoodieRemoteException e) {
+ assert e.getMessage().contains("Connection refused (Connection refused)");
+ }
+ // Enable API request retry for remote file system view.
+ view = new RemoteHoodieTableFileSystemView(metaClient, FileSystemViewStorageConfig
+ .newBuilder()
+ .withRemoteServerHost("localhost")
+ .withRemoteServerPort(server.getServerPort())
+ .withRemoteTimelineClientRetry(true)
+ .withRemoteTimelineClientMaxRetryIntervalMs(2000L)
+ .withRemoteTimelineClientMaxRetryNumbers(4)
+ .build());
+ try {
+ view.getLatestBaseFiles();
+ fail("Should be catch Exception 'Still failed to Sending request after retried 4 times.'");
+ } catch (HoodieRemoteException e) {
+ assert e.getMessage().equalsIgnoreCase("Still failed to Sending request after retried 4 times.");
+ }
+ // Retry succeed after 2 or 3 tries.
+ new Thread(() -> {
+ try {
+ Thread.sleep(5000L);
+ LOG.info("Restart server.");
+ server.startService();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }).run();
+ view.getLatestBaseFiles();
+ server.close();
+ }
}