You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/09/06 04:14:20 UTC

[hudi] branch master updated: [HUDI-4619] Add a remote request retry mechanism for 'Remotehoodietablefilesystemview'. (#6393)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2590d2abf1 [HUDI-4619] Add a remote request retry mechanism for 'Remotehoodietablefilesystemview'. (#6393)
2590d2abf1 is described below

commit 2590d2abf13a91eea8d73fb36c069da7efa375b9
Author: HunterXHunter <13...@qq.com>
AuthorDate: Tue Sep 6 12:14:11 2022 +0800

    [HUDI-4619] Add a remote request retry mechanism for 'Remotehoodietablefilesystemview'. (#6393)
---
 .../client/embedded/EmbeddedTimelineService.java   |  5 ++
 .../common/table/view/FileSystemViewManager.java   |  3 +-
 .../table/view/FileSystemViewStorageConfig.java    | 76 ++++++++++++++++++++++
 .../view/RemoteHoodieTableFileSystemView.java      | 45 ++++++++-----
 .../org/apache/hudi/common/util/RetryHelper.java   | 52 +++++++++------
 .../java/org/apache/hudi/util/StreamerUtil.java    |  5 ++
 .../TestRemoteHoodieTableFileSystemView.java       | 46 +++++++++++++
 7 files changed, 194 insertions(+), 38 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
index 72f8e29c9f..4d5375894d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
@@ -117,6 +117,11 @@ public class EmbeddedTimelineService {
         .withRemoteServerHost(hostAddr)
         .withRemoteServerPort(serverPort)
         .withRemoteTimelineClientTimeoutSecs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientTimeoutSecs())
+        .withRemoteTimelineClientRetry(writeConfig.getClientSpecifiedViewStorageConfig().isRemoteTimelineClientRetryEnabled())
+        .withRemoteTimelineClientMaxRetryNumbers(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers())
+        .withRemoteTimelineInitialRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineInitialRetryIntervalMs())
+        .withRemoteTimelineClientMaxRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryIntervalMs())
+        .withRemoteTimelineClientRetryExceptions(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientRetryExceptions())
         .build();
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
index 35fda6c416..48023d5046 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
@@ -214,8 +214,7 @@ public class FileSystemViewManager {
     LOG.info("Creating remote view for basePath " + metaClient.getBasePath() + ". Server="
         + viewConf.getRemoteViewServerHost() + ":" + viewConf.getRemoteViewServerPort() + ", Timeout="
         + viewConf.getRemoteTimelineClientTimeoutSecs());
-    return new RemoteHoodieTableFileSystemView(viewConf.getRemoteViewServerHost(), viewConf.getRemoteViewServerPort(),
-        metaClient, viewConf.getRemoteTimelineClientTimeoutSecs());
+    return new RemoteHoodieTableFileSystemView(metaClient, viewConf);
   }
 
   public static FileSystemViewManager createViewManager(final HoodieEngineContext context,
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
index 63f10855ba..92937f61e2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
@@ -110,6 +110,37 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
       .defaultValue(5 * 60) // 5 min
       .withDocumentation("Timeout in seconds, to wait for API requests against a remote file system view. e.g timeline server.");
 
+  public static final ConfigProperty<String> REMOTE_RETRY_ENABLE = ConfigProperty
+          .key("hoodie.filesystem.view.remote.retry.enable")
+          .defaultValue("false")
+          .sinceVersion("0.12.1")
+          .withDocumentation("Whether to enable API request retry for remote file system view.");
+
+  public static final ConfigProperty<Integer> REMOTE_MAX_RETRY_NUMBERS = ConfigProperty
+      .key("hoodie.filesystem.view.remote.retry.max_numbers")
+      .defaultValue(3) // 3 times
+      .sinceVersion("0.12.1")
+      .withDocumentation("Maximum number of retry for API requests against a remote file system view. e.g timeline server.");
+
+  public static final ConfigProperty<Long> REMOTE_INITIAL_RETRY_INTERVAL_MS = ConfigProperty
+      .key("hoodie.filesystem.view.remote.retry.initial_interval_ms")
+      .defaultValue(100L)
+      .sinceVersion("0.12.1")
+      .withDocumentation("Amount of time (in ms) to wait, before retry to do operations on storage.");
+
+  public static final ConfigProperty<Long> REMOTE_MAX_RETRY_INTERVAL_MS = ConfigProperty
+      .key("hoodie.filesystem.view.remote.retry.max_interval_ms")
+      .defaultValue(2000L)
+      .sinceVersion("0.12.1")
+      .withDocumentation("Maximum amount of time (in ms), to wait for next retry.");
+
+  public static final ConfigProperty<String> RETRY_EXCEPTIONS = ConfigProperty
+          .key("hoodie.filesystem.view.remote.retry.exceptions")
+          .defaultValue("")
+          .sinceVersion("0.12.1")
+          .withDocumentation("The class name of the Exception that needs to be re-tryed, separated by commas. "
+                  + "Default is empty which means retry all the IOException and RuntimeException from Remote Request.");
+
   public static final ConfigProperty<String> REMOTE_BACKUP_VIEW_ENABLE = ConfigProperty
       .key("hoodie.filesystem.remote.backup.view.enable")
       .defaultValue("true") // Need to be disabled only for tests.
@@ -144,6 +175,26 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
     return getInt(REMOTE_TIMEOUT_SECS);
   }
 
+  public boolean isRemoteTimelineClientRetryEnabled() {
+    return getBoolean(REMOTE_RETRY_ENABLE);
+  }
+
+  public Integer getRemoteTimelineClientMaxRetryNumbers() {
+    return getInt(REMOTE_MAX_RETRY_NUMBERS);
+  }
+
+  public Long getRemoteTimelineInitialRetryIntervalMs() {
+    return getLong(REMOTE_INITIAL_RETRY_INTERVAL_MS);
+  }
+
+  public Long getRemoteTimelineClientMaxRetryIntervalMs() {
+    return getLong(REMOTE_MAX_RETRY_INTERVAL_MS);
+  }
+
+  public String getRemoteTimelineClientRetryExceptions() {
+    return getString(RETRY_EXCEPTIONS);
+  }
+
   public long getMaxMemoryForFileGroupMap() {
     long totalMemory = getLong(SPILLABLE_MEMORY);
     return totalMemory - getMaxMemoryForPendingCompaction() - getMaxMemoryForBootstrapBaseFile();
@@ -245,6 +296,31 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withRemoteTimelineClientRetry(boolean enableRetry) {
+      fileSystemViewStorageConfig.setValue(REMOTE_RETRY_ENABLE, Boolean.toString(enableRetry));
+      return this;
+    }
+
+    public Builder withRemoteTimelineClientMaxRetryNumbers(Integer maxRetryNumbers) {
+      fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_NUMBERS, maxRetryNumbers.toString());
+      return this;
+    }
+
+    public Builder withRemoteTimelineInitialRetryIntervalMs(Long initialRetryIntervalMs) {
+      fileSystemViewStorageConfig.setValue(REMOTE_INITIAL_RETRY_INTERVAL_MS, initialRetryIntervalMs.toString());
+      return this;
+    }
+
+    public Builder withRemoteTimelineClientMaxRetryIntervalMs(Long maxRetryIntervalMs) {
+      fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_INTERVAL_MS, maxRetryIntervalMs.toString());
+      return this;
+    }
+
+    public Builder withRemoteTimelineClientRetryExceptions(String retryExceptions) {
+      fileSystemViewStorageConfig.setValue(RETRY_EXCEPTIONS, retryExceptions);
+      return this;
+    }
+
     public Builder withMemFractionForPendingCompaction(Double memFractionForPendingCompaction) {
       fileSystemViewStorageConfig.setValue(SPILLABLE_COMPACTION_MEM_FRACTION, memFractionForPendingCompaction.toString());
       return this;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index 099b79cbba..0b32211a5b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
 import org.apache.hudi.common.table.timeline.dto.InstantDTO;
 import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.RetryHelper;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -128,26 +129,36 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
   private final HoodieTableMetaClient metaClient;
   private HoodieTimeline timeline;
   private final ObjectMapper mapper;
-  private final int timeoutSecs;
+  private final int timeoutMs;
 
   private boolean closed = false;
 
+  private RetryHelper<Response> retryHelper;
+
   private enum RequestMethod {
     GET, POST
   }
 
   public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient) {
-    this(server, port, metaClient, 300);
+    this(metaClient, FileSystemViewStorageConfig.newBuilder().withRemoteServerHost(server).withRemoteServerPort(port).build());
   }
 
-  public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient, int timeoutSecs) {
+  public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSystemViewStorageConfig viewConf) {
     this.basePath = metaClient.getBasePath();
-    this.serverHost = server;
-    this.serverPort = port;
     this.mapper = new ObjectMapper();
     this.metaClient = metaClient;
     this.timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
-    this.timeoutSecs = timeoutSecs;
+    this.serverHost = viewConf.getRemoteViewServerHost();
+    this.serverPort = viewConf.getRemoteViewServerPort();
+    this.timeoutMs = viewConf.getRemoteTimelineClientTimeoutSecs() * 1000;
+    if (viewConf.isRemoteTimelineClientRetryEnabled()) {
+      retryHelper =  new RetryHelper(
+              viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
+              viewConf.getRemoteTimelineClientMaxRetryNumbers(),
+              viewConf.getRemoteTimelineInitialRetryIntervalMs(),
+              viewConf.getRemoteTimelineClientRetryExceptions(),
+              "Sending request");
+    }
   }
 
   private <T> T executeRequest(String requestPath, Map<String, String> queryParameters, TypeReference reference,
@@ -165,17 +176,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
 
     String url = builder.toString();
     LOG.info("Sending request : (" + url + ")");
-    Response response;
-    int timeout = this.timeoutSecs * 1000; // msec
-    switch (method) {
-      case GET:
-        response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
-        break;
-      case POST:
-      default:
-        response = Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute();
-        break;
-    }
+    Response response = retryHelper != null ? retryHelper.start(() -> get(timeoutMs, url, method)) : get(timeoutMs, url, method);
     String content = response.returnContent().asString();
     return (T) mapper.readValue(content, reference);
   }
@@ -495,4 +496,14 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
       throw new HoodieRemoteException(e);
     }
   }
+
+  private Response get(int timeoutMs, String url, RequestMethod method) throws IOException {
+    switch (method) {
+      case GET:
+        return Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
+      case POST:
+      default:
+        return Request.Post(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
+    }
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java
index 067c5ee40d..2e82b548f0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java
@@ -18,28 +18,27 @@
 
 package org.apache.hudi.common.util;
 
+import org.apache.hudi.exception.HoodieException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.stream.Collectors;
 
-public class RetryHelper<T> {
+public class RetryHelper<T> implements Serializable {
   private static final Logger LOG = LogManager.getLogger(RetryHelper.class);
-  private CheckedFunction<T> func;
-  private int num;
-  private long maxIntervalTime;
-  private long initialIntervalTime = 100L;
+  private transient CheckedFunction<T> func;
+  private final int num;
+  private final long maxIntervalTime;
+  private final long initialIntervalTime;
   private String taskInfo = "N/A";
   private List<? extends Class<? extends Exception>> retryExceptionsClasses;
 
-  public RetryHelper() {
-  }
-
   public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) {
     this.num = maxRetryNumbers;
     this.initialIntervalTime = initialRetryIntervalMs;
@@ -47,23 +46,29 @@ public class RetryHelper<T> {
     if (StringUtils.isNullOrEmpty(retryExceptions)) {
       this.retryExceptionsClasses = new ArrayList<>();
     } else {
-      this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
-          .map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
-          .map(Exception::getClass)
-          .collect(Collectors.toList());
+      try {
+        this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
+                .map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
+                .map(Exception::getClass)
+                .collect(Collectors.toList());
+      } catch (HoodieException e) {
+        LOG.error("Exception while loading retry exceptions classes '" + retryExceptions + "'.", e);
+        this.retryExceptionsClasses = new ArrayList<>();
+      }
     }
   }
 
-  public RetryHelper(String taskInfo) {
+  public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions, String taskInfo) {
+    this(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptions);
     this.taskInfo = taskInfo;
   }
 
-  public RetryHelper tryWith(CheckedFunction<T> func) {
+  public RetryHelper<T> tryWith(CheckedFunction<T> func) {
     this.func = func;
     return this;
   }
 
-  public T start() throws IOException {
+  public T start(CheckedFunction<T> func) throws IOException {
     int retries = 0;
     T functionResult = null;
 
@@ -77,14 +82,18 @@ public class RetryHelper<T> {
           throw e;
         }
         if (retries++ >= num) {
-          LOG.error("Still failed to " + taskInfo + " after retried " + num + " times.", e);
+          String message = "Still failed to " + taskInfo + " after retried " + num + " times.";
+          LOG.error(message, e);
+          if (e instanceof IOException) {
+            throw new IOException(message, e);
+          }
           throw e;
         }
-        LOG.warn("Catch Exception " + taskInfo + ", will retry after " + waitTime + " ms.", e);
+        LOG.warn("Catch Exception for " + taskInfo + ", will retry after " + waitTime + " ms.", e);
         try {
           Thread.sleep(waitTime);
         } catch (InterruptedException ex) {
-            // ignore InterruptedException here
+          // ignore InterruptedException here
         }
       }
     }
@@ -92,9 +101,14 @@ public class RetryHelper<T> {
     if (retries > 0) {
       LOG.info("Success to " + taskInfo + " after retried " + retries + " times.");
     }
+
     return functionResult;
   }
 
+  public T start() throws IOException {
+    return start(this.func);
+  }
+
   private boolean checkIfExceptionInRetryList(Exception e) {
     boolean inRetryList = false;
 
@@ -123,7 +137,7 @@ public class RetryHelper<T> {
   }
 
   @FunctionalInterface
-  public interface CheckedFunction<T> {
+  public interface CheckedFunction<T> extends Serializable {
     T get() throws IOException;
   }
 }
\ No newline at end of file
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index b09d7ad8bf..4b93faeaf7 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -429,6 +429,11 @@ public class StreamerUtil {
         .withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
         .withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort())
         .withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs())
+        .withRemoteTimelineClientRetry(viewStorageConfig.isRemoteTimelineClientRetryEnabled())
+        .withRemoteTimelineClientMaxRetryNumbers(viewStorageConfig.getRemoteTimelineClientMaxRetryNumbers())
+        .withRemoteTimelineInitialRetryIntervalMs(viewStorageConfig.getRemoteTimelineInitialRetryIntervalMs())
+        .withRemoteTimelineClientMaxRetryIntervalMs(viewStorageConfig.getRemoteTimelineClientMaxRetryIntervalMs())
+        .withRemoteTimelineClientRetryExceptions(viewStorageConfig.getRemoteTimelineClientRetryExceptions())
         .build();
     ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt, conf);
     return writeClient;
diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
index f9a6172b5e..127bc51e95 100644
--- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
+++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
@@ -28,12 +28,16 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TestHoodieTableFileSystemView;
+import org.apache.hudi.exception.HoodieRemoteException;
 import org.apache.hudi.timeline.service.TimelineService;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.fail;
 
 /**
  * Bring up a remote Timeline Server and run all test-cases of TestHoodieTableFileSystemView against it.
@@ -64,4 +68,46 @@ public class TestRemoteHoodieTableFileSystemView extends TestHoodieTableFileSyst
     view = new RemoteHoodieTableFileSystemView("localhost", server.getServerPort(), metaClient);
     return view;
   }
+
+  @Test
+  public void testRemoteHoodieTableFileSystemViewWithRetry() {
+    // Service is available.
+    view.getLatestBaseFiles();
+    // Shut down the service.
+    server.close();
+    try {
+      // Immediately fails and throws a connection refused exception.
+      view.getLatestBaseFiles();
+      fail("Should be catch Exception 'Connection refused (Connection refused)'");
+    } catch (HoodieRemoteException e) {
+      assert e.getMessage().contains("Connection refused (Connection refused)");
+    }
+    // Enable API request retry for remote file system view.
+    view =  new RemoteHoodieTableFileSystemView(metaClient, FileSystemViewStorageConfig
+            .newBuilder()
+            .withRemoteServerHost("localhost")
+            .withRemoteServerPort(server.getServerPort())
+            .withRemoteTimelineClientRetry(true)
+            .withRemoteTimelineClientMaxRetryIntervalMs(2000L)
+            .withRemoteTimelineClientMaxRetryNumbers(4)
+            .build());
+    try {
+      view.getLatestBaseFiles();
+      fail("Should be catch Exception 'Still failed to Sending request after retried 4 times.'");
+    } catch (HoodieRemoteException e) {
+      assert e.getMessage().equalsIgnoreCase("Still failed to Sending request after retried 4 times.");
+    }
+    // Retry succeed after 2 or 3 tries.
+    new Thread(() -> {
+      try {
+        Thread.sleep(5000L);
+        LOG.info("Restart server.");
+        server.startService();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }).run();
+    view.getLatestBaseFiles();
+    server.close();
+  }
 }