You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ti...@apache.org on 2024/02/08 17:05:18 UTC

(pinot) branch master updated: Make segment download from Peer servers more robust by retrying both peer discovery and download. (#12317)

This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fe25ba2e2 Make segment download from Peer servers more robust by retrying both peer discovery and download. (#12317)
3fe25ba2e2 is described below

commit 3fe25ba2e2babe984ac94606eddb4ab6a8b47191
Author: Ting Chen <ti...@uber.com>
AuthorDate: Thu Feb 8 09:05:12 2024 -0800

    Make segment download from Peer servers more robust by retrying both peer discovery and download. (#12317)
    
    * Retry peer discovery during peer segment download.
    
    * Move the segment finder to pinot-common to avoid circular dependencies
    
    * Add unit tests and refactor the codes.
    
    * Fix lint issues.
    
    * Fix long lines for lint issues
    
    * Fix lint issues
    
    * Fix lint issues
    
    * Fix lint issues
    
    * Fix compilation issues.
    
    * Fix lint.
    
    * Remove a redundant code branch.
    
    * Fix based on comments.
    
    * Remove unused imports.
    
    * Fix lints.
    
    * Fix lints.
    
    * Revise based on comments
    
    * Revise based on comments
    
    * Revise based on comments
---
 .../common/utils/fetcher/BaseSegmentFetcher.java   |  35 +++++
 .../common/utils/fetcher/HttpSegmentFetcher.java   |  23 ++-
 .../pinot/common/utils/fetcher/SegmentFetcher.java |   9 ++
 .../pinot/core/util/PeerServerSegmentFinder.java   |   0
 .../utils/fetcher/HttpSegmentFetcherTest.java      | 171 +++++++++++++++++++++
 .../utils/fetcher/SegmentFetcherFactoryTest.java   |   8 +-
 .../core/data/manager/BaseTableDataManager.java    |  10 --
 .../manager/realtime/RealtimeTableDataManager.java |  15 +-
 8 files changed, 253 insertions(+), 18 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
index d1756147d4..9deca98343 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import org.apache.pinot.common.auth.AuthProviderUtils;
 import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -109,6 +110,40 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher {
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * @param segmentName the name of the segment to fetch.
+   * @param uriSupplier the supplier to the list of segment download uris.
+   * @param dest        The destination to put the downloaded segment.
+   * @throws Exception when the segment fetch fails after all attempts are exhausted or other runtime exceptions occur.
+   * This method keeps retrying (with exponential backoff) to go through the list download uris to fetch the segment
+   * until the retry limit is reached.
+   *
+   */
+  @Override
+  public void fetchSegmentToLocal(String segmentName, Supplier<List<URI>> uriSupplier, File dest) throws Exception {
+    try {
+      int attempt =
+          RetryPolicies.exponentialBackoffRetryPolicy(_retryCount, _retryWaitMs, _retryDelayScaleFactor).attempt(() -> {
+            List<URI> suppliedURIs = uriSupplier.get();
+            // Go through the list of URIs to fetch the segment until success.
+            for (URI uri : suppliedURIs) {
+              try {
+                fetchSegmentToLocalWithoutRetry(uri, dest);
+                return true;
+              } catch (Exception e) {
+                _logger.warn("Download segment {} from peer {} failed.", segmentName, uri, e);
+              }
+            }
+            // None of the URI works. Return false for retry.
+            return false;
+          });
+      _logger.info("Download segment {} successfully with {} attempts.", segmentName, attempt + 1);
+    } catch (Exception e) {
+      _logger.error("Failed to download segment {} after retries.", segmentName, e);
+      throw e;
+    }
+  }
+
   /**
    * Fetches a segment from URI location to local without retry. Sub-class should override this or
    * {@link #fetchSegmentToLocal(URI, File)}.
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
index be73c1908b..2b986259b5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.utils.fetcher;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.net.InetAddresses;
 import java.io.File;
 import java.io.IOException;
@@ -48,6 +49,20 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
     _httpClient = new FileUploadDownloadClient(HttpClientConfig.newBuilder(config).build());
   }
 
+  public HttpSegmentFetcher() {
+  }
+
+  @VisibleForTesting
+  protected HttpSegmentFetcher(FileUploadDownloadClient httpClient, PinotConfiguration config) {
+    _httpClient = httpClient;
+    _retryCount = config.getProperty(RETRY_COUNT_CONFIG_KEY, DEFAULT_RETRY_COUNT);
+    _retryWaitMs = config.getProperty(RETRY_WAIT_MS_CONFIG_KEY, DEFAULT_RETRY_WAIT_MS);
+    _retryDelayScaleFactor = config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, DEFAULT_RETRY_DELAY_SCALE_FACTOR);
+    _logger
+        .info("Initialized with retryCount: {}, retryWaitMs: {}, retryDelayScaleFactor: {}", _retryCount, _retryWaitMs,
+            _retryDelayScaleFactor);
+  }
+
   @Override
   public void fetchSegmentToLocal(URI downloadURI, File dest)
       throws Exception {
@@ -174,8 +189,12 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
       throws Exception {
     try {
       int statusCode = _httpClient.downloadFile(uri, dest, _authProvider);
-      _logger.info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(),
-          statusCode);
+      _logger.info("Try to download the segment from: {} to: {} of size: {}; Response status code: {}", uri, dest,
+          dest.length(), statusCode);
+      // In case of download failure, throw exception.
+      if (statusCode >= 300) {
+        throw new HttpErrorStatusException("Failed to download segment", statusCode);
+      }
     } catch (Exception e) {
       _logger.warn("Caught exception while downloading segment from: {} to: {}", uri, dest, e);
       throw e;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java
index 78d720751f..96961e4a08 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.net.URI;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
 
@@ -49,4 +50,12 @@ public interface SegmentFetcher {
    */
   void fetchSegmentToLocal(List<URI> uri, File dest)
       throws Exception;
+
+  /**
+   * @param segmentName the segment name to fetch.
+   * @param uriSupplier the supplier to the list of segment download uris.
+   * @param dest        The destination to put the downloaded segment.
+   * @throws Exception when the segment fetch fails after all attempts are exhausted or other runtime exceptions occur.
+   */
+  void fetchSegmentToLocal(String segmentName, Supplier<List<URI>> uriSupplier, File dest) throws Exception;
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java b/pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
similarity index 100%
rename from pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
rename to pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java
new file mode 100644
index 0000000000..3159168dab
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.core.util.PeerServerSegmentFinder;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.mockito.MockedStatic;
+import org.testng.Assert;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+
+public class HttpSegmentFetcherTest {
+  private MockedStatic<PeerServerSegmentFinder> _peerServerSegmentFinder = mockStatic(PeerServerSegmentFinder.class);
+  private PinotConfiguration _fetcherConfig;
+
+  @BeforeSuite
+  public void initTest() {
+    _fetcherConfig = new PinotConfiguration();
+    _fetcherConfig.setProperty(BaseSegmentFetcher.RETRY_COUNT_CONFIG_KEY, 3);
+  }
+
+  @Test
+  public void testFetchSegmentToLocalSucceedAtFirstAttempt()
+      throws URISyntaxException, IOException, HttpErrorStatusException {
+    FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
+    when(client.downloadFile(any(), any(), any())).thenReturn(200);
+    HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig);
+    HelixManager helixManager = mock(HelixManager.class);
+
+    List<URI> uris = new ArrayList<>();
+    uris.add(new URI("http://h1:8080"));
+    uris.add(new URI("http://h2:8080"));
+    _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
+        .thenReturn(uris);
+    try {
+      httpSegmentFetcher.fetchSegmentToLocal("seg",
+          () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file"));
+    } catch (Exception e) {
+      // If we reach here, the download fails.
+      Assert.assertTrue(false, "Download segment failed");
+      Assert.assertTrue(e instanceof AttemptsExceededException);
+    }
+    _peerServerSegmentFinder.reset();
+  }
+
+  @Test
+  public void testFetchSegmentToLocalAllDownloadAttemptsFailed()
+      throws URISyntaxException, IOException, HttpErrorStatusException {
+    FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
+    // All three attempts fails.
+    when(client.downloadFile(any(), any(), any())).thenReturn(300).thenReturn(300).thenReturn(300);
+    HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig);
+    HelixManager helixManager = mock(HelixManager.class);
+    List<URI> uris = new ArrayList<>();
+    uris.add(new URI("http://h1:8080"));
+    uris.add(new URI("http://h2:8080"));
+
+    _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
+        .thenReturn(uris);
+    try {
+      httpSegmentFetcher.fetchSegmentToLocal("seg",
+          () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file"));
+      // The test should not reach here because the fetch will throw exception.
+      Assert.assertTrue(false, "Download segment failed");
+    } catch (Exception e) {
+      // If we reach here, the download fails.
+      Assert.assertTrue(true, "Download segment failed");
+    }
+  }
+
+  @Test
+  public void testFetchSegmentToLocalSuccessAfterRetry()
+      throws URISyntaxException, IOException, HttpErrorStatusException {
+    FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
+    // the first two attempts failed until the last attempt succeeds
+    when(client.downloadFile(any(), any(), any())).thenReturn(300).thenReturn(300).thenReturn(200);
+    HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig);
+    HelixManager helixManager = mock(HelixManager.class);
+    List<URI> uris = new ArrayList<>();
+    uris.add(new URI("http://h1:8080"));
+    uris.add(new URI("http://h2:8080"));
+
+    _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
+        .thenReturn(uris);
+    try {
+      httpSegmentFetcher.fetchSegmentToLocal("seg",
+          () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file"));
+    } catch (Exception e) {
+      // If we reach here, the download fails.
+      Assert.assertTrue(false, "Download segment failed");
+    }
+  }
+
+  @Test
+  public void testFetchSegmentToLocalSuccessAfterFirstTwoAttemptsFoundNoPeerServers()
+      throws URISyntaxException, IOException, HttpErrorStatusException {
+    FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
+    //  The download always succeeds.
+    when(client.downloadFile(any(), any(), any())).thenReturn(200);
+    HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig);
+    HelixManager helixManager = mock(HelixManager.class);
+    List<URI> uris = new ArrayList<>();
+    uris.add(new URI("http://h1:8080"));
+    uris.add(new URI("http://h2:8080"));
+
+    // The first two attempts find NO peers hosting the segment but the last one found two servers.
+    _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
+        .thenReturn(List.of()).thenReturn(List.of()).thenReturn(uris);
+    try {
+      httpSegmentFetcher.fetchSegmentToLocal("seg",
+          () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file"));
+    } catch (Exception e) {
+      // If we reach here, the download fails.
+      Assert.assertTrue(false, "Download segment failed");
+    }
+  }
+
+  @Test
+  public void testFetchSegmentToLocalFailureWithNoPeerServers()
+      throws IOException, HttpErrorStatusException {
+    FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
+    // the download always succeeds.
+    when(client.downloadFile(any(), any(), any())).thenReturn(200);
+    HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig);
+    HelixManager helixManager = mock(HelixManager.class);
+
+    _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
+        .thenReturn(List.of()).thenReturn(List.of()).thenReturn(List.of());
+    try {
+      httpSegmentFetcher.fetchSegmentToLocal("seg",
+          () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file"));
+      // The test should not reach here because the fetch will throw exception.
+      Assert.assertTrue(false, "Download segment failed");
+    } catch (Exception e) {
+      Assert.assertTrue(true, "Download segment failed");
+      Assert.assertTrue(e instanceof AttemptsExceededException);
+    }
+  }
+}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java
index 389ccdc6d6..bc20cb84e1 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import org.apache.pinot.spi.crypt.PinotCrypter;
 import org.apache.pinot.spi.crypt.PinotCrypterFactory;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -125,7 +126,12 @@ public class SegmentFetcherFactoryTest {
     }
 
     @Override
-    public void fetchSegmentToLocal(List<URI> uri, File dest)
+    public void fetchSegmentToLocal(List<URI> uri, File dest) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void fetchSegmentToLocal(String segmentName, Supplier<List<URI>> uriSupplier, File dest)
         throws Exception {
       throw new UnsupportedOperationException();
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 191ea04a0d..bbad463d1d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -40,8 +40,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
-import org.apache.commons.configuration2.Configuration;
-import org.apache.commons.configuration2.ConfigurationConverter;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -75,7 +73,6 @@ import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.slf4j.Logger;
@@ -954,11 +951,4 @@ public abstract class BaseTableDataManager implements TableDataManager {
       }
     }
   }
-
-  private static PinotConfiguration toPinotConfiguration(Configuration configuration) {
-    if (configuration == null) {
-      return new PinotConfiguration();
-    }
-    return new PinotConfiguration((Map<String, Object>) (Map) ConfigurationConverter.getMap(configuration));
-  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 37e9d88f68..d974663b20 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -658,11 +659,15 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
     try {
       tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + System.currentTimeMillis());
       File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-      // First find servers hosting the segment in a ONLINE state.
-      List<URI> peerSegmentURIs = PeerServerSegmentFinder.getPeerServerURIs(segmentName, downloadScheme, _helixManager);
-      // Next download the segment from a randomly chosen server using configured scheme.
-      SegmentFetcherFactory.getSegmentFetcher(downloadScheme).fetchSegmentToLocal(peerSegmentURIs, segmentTarFile);
-      _logger.info("Fetched segment {} from: {} to: {} of size: {}", segmentName, peerSegmentURIs, segmentTarFile,
+      // Next download the segment from a randomly chosen server using configured download scheme (http or https).
+      SegmentFetcherFactory.getSegmentFetcher(downloadScheme).fetchSegmentToLocal(segmentName,
+          () -> {
+            List<URI> peerServerURIs =
+                PeerServerSegmentFinder.getPeerServerURIs(segmentName, downloadScheme, _helixManager);
+            Collections.shuffle(peerServerURIs);
+            return peerServerURIs;
+          }, segmentTarFile);
+      _logger.info("Fetched segment {} successfully to {} of size {}", segmentName, segmentTarFile,
           segmentTarFile.length());
       untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, tempRootDir);
     } catch (Exception e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org