You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ti...@apache.org on 2024/02/08 17:05:18 UTC
(pinot) branch master updated: Make segment download from Peer servers more robust by retrying both peer discovery and download. (#12317)
This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3fe25ba2e2 Make segment download from Peer servers more robust by retrying both peer discovery and download. (#12317)
3fe25ba2e2 is described below
commit 3fe25ba2e2babe984ac94606eddb4ab6a8b47191
Author: Ting Chen <ti...@uber.com>
AuthorDate: Thu Feb 8 09:05:12 2024 -0800
Make segment download from Peer servers more robust by retrying both peer discovery and download. (#12317)
* Retry peer discovery during peer segment download.
* Move the segment finder to pinot-common to avoid circular dependencies
* Add unit tests and refactor the codes.
* Fix lint issues.
* Fix long lines for lint issues
* Fix lint issues
* Fix lint issues
* Fix lint issues
* Fix compilation issues.
* Fix lint.
* Remove a redundant code branch.
* Fix based on comments.
* Remove unused imports.
* Fix lints.
* Fix lints.
* Revise based on comments
* Revise based on comments
* Revise based on comments
---
.../common/utils/fetcher/BaseSegmentFetcher.java | 35 +++++
.../common/utils/fetcher/HttpSegmentFetcher.java | 23 ++-
.../pinot/common/utils/fetcher/SegmentFetcher.java | 9 ++
.../pinot/core/util/PeerServerSegmentFinder.java | 0
.../utils/fetcher/HttpSegmentFetcherTest.java | 171 +++++++++++++++++++++
.../utils/fetcher/SegmentFetcherFactoryTest.java | 8 +-
.../core/data/manager/BaseTableDataManager.java | 10 --
.../manager/realtime/RealtimeTableDataManager.java | 15 +-
8 files changed, 253 insertions(+), 18 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
index d1756147d4..9deca98343 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
@@ -23,6 +23,7 @@ import java.net.URI;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -109,6 +110,40 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher {
throw new UnsupportedOperationException();
}
+ /**
+ * @param segmentName the name of the segment to fetch.
+ * @param uriSupplier the supplier to the list of segment download uris.
+ * @param dest The destination to put the downloaded segment.
+ * @throws Exception when the segment fetch fails after all attempts are exhausted or other runtime exceptions occur.
+ * This method keeps retrying (with exponential backoff) to go through the list download uris to fetch the segment
+ * until the retry limit is reached.
+ *
+ */
+ @Override
+ public void fetchSegmentToLocal(String segmentName, Supplier<List<URI>> uriSupplier, File dest) throws Exception {
+ try {
+ int attempt =
+ RetryPolicies.exponentialBackoffRetryPolicy(_retryCount, _retryWaitMs, _retryDelayScaleFactor).attempt(() -> {
+ List<URI> suppliedURIs = uriSupplier.get();
+ // Go through the list of URIs to fetch the segment until success.
+ for (URI uri : suppliedURIs) {
+ try {
+ fetchSegmentToLocalWithoutRetry(uri, dest);
+ return true;
+ } catch (Exception e) {
+ _logger.warn("Download segment {} from peer {} failed.", segmentName, uri, e);
+ }
+ }
+ // None of the URI works. Return false for retry.
+ return false;
+ });
+ _logger.info("Download segment {} successfully with {} attempts.", segmentName, attempt + 1);
+ } catch (Exception e) {
+ _logger.error("Failed to download segment {} after retries.", segmentName, e);
+ throw e;
+ }
+ }
+
/**
* Fetches a segment from URI location to local without retry. Sub-class should override this or
* {@link #fetchSegmentToLocal(URI, File)}.
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
index be73c1908b..2b986259b5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.utils.fetcher;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.InetAddresses;
import java.io.File;
import java.io.IOException;
@@ -48,6 +49,20 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
_httpClient = new FileUploadDownloadClient(HttpClientConfig.newBuilder(config).build());
}
+ public HttpSegmentFetcher() {
+ }
+
+ @VisibleForTesting
+ protected HttpSegmentFetcher(FileUploadDownloadClient httpClient, PinotConfiguration config) {
+ _httpClient = httpClient;
+ _retryCount = config.getProperty(RETRY_COUNT_CONFIG_KEY, DEFAULT_RETRY_COUNT);
+ _retryWaitMs = config.getProperty(RETRY_WAIT_MS_CONFIG_KEY, DEFAULT_RETRY_WAIT_MS);
+ _retryDelayScaleFactor = config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, DEFAULT_RETRY_DELAY_SCALE_FACTOR);
+ _logger
+ .info("Initialized with retryCount: {}, retryWaitMs: {}, retryDelayScaleFactor: {}", _retryCount, _retryWaitMs,
+ _retryDelayScaleFactor);
+ }
+
@Override
public void fetchSegmentToLocal(URI downloadURI, File dest)
throws Exception {
@@ -174,8 +189,12 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
throws Exception {
try {
int statusCode = _httpClient.downloadFile(uri, dest, _authProvider);
- _logger.info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(),
- statusCode);
+ _logger.info("Try to download the segment from: {} to: {} of size: {}; Response status code: {}", uri, dest,
+ dest.length(), statusCode);
+ // In case of download failure, throw exception.
+ if (statusCode >= 300) {
+ throw new HttpErrorStatusException("Failed to download segment", statusCode);
+ }
} catch (Exception e) {
_logger.warn("Caught exception while downloading segment from: {} to: {}", uri, dest, e);
throw e;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java
index 78d720751f..96961e4a08 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -49,4 +50,12 @@ public interface SegmentFetcher {
*/
void fetchSegmentToLocal(List<URI> uri, File dest)
throws Exception;
+
+ /**
+ * @param segmentName the segment name to fetch.
+ * @param uriSupplier the supplier to the list of segment download uris.
+ * @param dest The destination to put the downloaded segment.
+ * @throws Exception when the segment fetch fails after all attempts are exhausted or other runtime exceptions occur.
+ */
+ void fetchSegmentToLocal(String segmentName, Supplier<List<URI>> uriSupplier, File dest) throws Exception;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java b/pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
similarity index 100%
rename from pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
rename to pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java
new file mode 100644
index 0000000000..3159168dab
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.core.util.PeerServerSegmentFinder;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.mockito.MockedStatic;
+import org.testng.Assert;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+
+public class HttpSegmentFetcherTest {
+ private MockedStatic<PeerServerSegmentFinder> _peerServerSegmentFinder = mockStatic(PeerServerSegmentFinder.class);
+ private PinotConfiguration _fetcherConfig;
+
+ @BeforeSuite
+ public void initTest() {
+ _fetcherConfig = new PinotConfiguration();
+ _fetcherConfig.setProperty(BaseSegmentFetcher.RETRY_COUNT_CONFIG_KEY, 3);
+ }
+
+ @Test
+ public void testFetchSegmentToLocalSucceedAtFirstAttempt()
+ throws URISyntaxException, IOException, HttpErrorStatusException {
+ FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
+ when(client.downloadFile(any(), any(), any())).thenReturn(200);
+ HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig);
+ HelixManager helixManager = mock(HelixManager.class);
+
+ List<URI> uris = new ArrayList<>();
+ uris.add(new URI("http://h1:8080"));
+ uris.add(new URI("http://h2:8080"));
+ _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
+ .thenReturn(uris);
+ try {
+ httpSegmentFetcher.fetchSegmentToLocal("seg",
+ () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file"));
+ } catch (Exception e) {
+ // If we reach here, the download fails.
+ Assert.assertTrue(false, "Download segment failed");
+ Assert.assertTrue(e instanceof AttemptsExceededException);
+ }
+ _peerServerSegmentFinder.reset();
+ }
+
+ @Test
+ public void testFetchSegmentToLocalAllDownloadAttemptsFailed()
+ throws URISyntaxException, IOException, HttpErrorStatusException {
+ FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
+ // All three attempts fails.
+ when(client.downloadFile(any(), any(), any())).thenReturn(300).thenReturn(300).thenReturn(300);
+ HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig);
+ HelixManager helixManager = mock(HelixManager.class);
+ List<URI> uris = new ArrayList<>();
+ uris.add(new URI("http://h1:8080"));
+ uris.add(new URI("http://h2:8080"));
+
+ _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
+ .thenReturn(uris);
+ try {
+ httpSegmentFetcher.fetchSegmentToLocal("seg",
+ () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file"));
+ // The test should not reach here because the fetch will throw exception.
+ Assert.assertTrue(false, "Download segment failed");
+ } catch (Exception e) {
+ // If we reach here, the download fails.
+ Assert.assertTrue(true, "Download segment failed");
+ }
+ }
+
+ @Test
+ public void testFetchSegmentToLocalSuccessAfterRetry()
+ throws URISyntaxException, IOException, HttpErrorStatusException {
+ FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
+ // the first two attempts failed until the last attempt succeeds
+ when(client.downloadFile(any(), any(), any())).thenReturn(300).thenReturn(300).thenReturn(200);
+ HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig);
+ HelixManager helixManager = mock(HelixManager.class);
+ List<URI> uris = new ArrayList<>();
+ uris.add(new URI("http://h1:8080"));
+ uris.add(new URI("http://h2:8080"));
+
+ _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
+ .thenReturn(uris);
+ try {
+ httpSegmentFetcher.fetchSegmentToLocal("seg",
+ () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file"));
+ } catch (Exception e) {
+ // If we reach here, the download fails.
+ Assert.assertTrue(false, "Download segment failed");
+ }
+ }
+
+ @Test
+ public void testFetchSegmentToLocalSuccessAfterFirstTwoAttemptsFoundNoPeerServers()
+ throws URISyntaxException, IOException, HttpErrorStatusException {
+ FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
+ // The download always succeeds.
+ when(client.downloadFile(any(), any(), any())).thenReturn(200);
+ HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig);
+ HelixManager helixManager = mock(HelixManager.class);
+ List<URI> uris = new ArrayList<>();
+ uris.add(new URI("http://h1:8080"));
+ uris.add(new URI("http://h2:8080"));
+
+ // The first two attempts find NO peers hosting the segment but the last one found two servers.
+ _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
+ .thenReturn(List.of()).thenReturn(List.of()).thenReturn(uris);
+ try {
+ httpSegmentFetcher.fetchSegmentToLocal("seg",
+ () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file"));
+ } catch (Exception e) {
+ // If we reach here, the download fails.
+ Assert.assertTrue(false, "Download segment failed");
+ }
+ }
+
+ @Test
+ public void testFetchSegmentToLocalFailureWithNoPeerServers()
+ throws IOException, HttpErrorStatusException {
+ FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
+ // the download always succeeds.
+ when(client.downloadFile(any(), any(), any())).thenReturn(200);
+ HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig);
+ HelixManager helixManager = mock(HelixManager.class);
+
+ _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
+ .thenReturn(List.of()).thenReturn(List.of()).thenReturn(List.of());
+ try {
+ httpSegmentFetcher.fetchSegmentToLocal("seg",
+ () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file"));
+ // The test should not reach here because the fetch will throw exception.
+ Assert.assertTrue(false, "Download segment failed");
+ } catch (Exception e) {
+ Assert.assertTrue(true, "Download segment failed");
+ Assert.assertTrue(e instanceof AttemptsExceededException);
+ }
+ }
+}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java
index 389ccdc6d6..bc20cb84e1 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactoryTest.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
import org.apache.pinot.spi.crypt.PinotCrypter;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -125,7 +126,12 @@ public class SegmentFetcherFactoryTest {
}
@Override
- public void fetchSegmentToLocal(List<URI> uri, File dest)
+ public void fetchSegmentToLocal(List<URI> uri, File dest) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fetchSegmentToLocal(String segmentName, Supplier<List<URI>> uriSupplier, File dest)
throws Exception {
throw new UnsupportedOperationException();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 191ea04a0d..bbad463d1d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -40,8 +40,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
-import org.apache.commons.configuration2.Configuration;
-import org.apache.commons.configuration2.ConfigurationConverter;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -75,7 +73,6 @@ import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.slf4j.Logger;
@@ -954,11 +951,4 @@ public abstract class BaseTableDataManager implements TableDataManager {
}
}
}
-
- private static PinotConfiguration toPinotConfiguration(Configuration configuration) {
- if (configuration == null) {
- return new PinotConfiguration();
- }
- return new PinotConfiguration((Map<String, Object>) (Map) ConfigurationConverter.getMap(configuration));
- }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 37e9d88f68..d974663b20 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -658,11 +659,15 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
try {
tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + System.currentTimeMillis());
File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- // First find servers hosting the segment in a ONLINE state.
- List<URI> peerSegmentURIs = PeerServerSegmentFinder.getPeerServerURIs(segmentName, downloadScheme, _helixManager);
- // Next download the segment from a randomly chosen server using configured scheme.
- SegmentFetcherFactory.getSegmentFetcher(downloadScheme).fetchSegmentToLocal(peerSegmentURIs, segmentTarFile);
- _logger.info("Fetched segment {} from: {} to: {} of size: {}", segmentName, peerSegmentURIs, segmentTarFile,
+ // Next download the segment from a randomly chosen server using configured download scheme (http or https).
+ SegmentFetcherFactory.getSegmentFetcher(downloadScheme).fetchSegmentToLocal(segmentName,
+ () -> {
+ List<URI> peerServerURIs =
+ PeerServerSegmentFinder.getPeerServerURIs(segmentName, downloadScheme, _helixManager);
+ Collections.shuffle(peerServerURIs);
+ return peerServerURIs;
+ }, segmentTarFile);
+ _logger.info("Fetched segment {} successfully to {} of size {}", segmentName, segmentTarFile,
segmentTarFile.length());
untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, tempRootDir);
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org