You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/10/27 00:49:38 UTC
[pinot] branch master updated: Round Robin IP addresses when retry
uploading/downloading segments (#7585)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0166135 Round Robin IP addresses when retry uploading/downloading segments (#7585)
0166135 is described below
commit 0166135772eb165e52900db558490e77eae767fb
Author: Liang Mingqiang <mi...@linkedin.com>
AuthorDate: Tue Oct 26 17:49:24 2021 -0700
Round Robin IP addresses when retry uploading/downloading segments (#7585)
* Round Robin IP addresses when retry upload/download segments
* Add unit test and reset retry count
* add header
* fix integration test by updating tlstest.jks
* Tolerate LB disallowing direct access by IP address issue
* Add comments about mockito-inline to pom file
---
pinot-common/pom.xml | 13 +-
.../common/utils/FileUploadDownloadClient.java | 43 +++--
.../pinot/common/utils/RoundRobinURIProvider.java | 64 ++++++++
.../common/utils/fetcher/HttpSegmentFetcher.java | 34 +++-
.../common/utils/RoundRobinURIProviderTest.java | 179 +++++++++++++++++++++
.../tests/BasicAuthTlsRealtimeIntegrationTest.java | 12 ++
.../src/test/resources/tlstest.jks | Bin 4271 -> 9123 bytes
.../minion/tasks/SegmentConversionUtils.java | 31 +++-
pom.xml | 6 +
9 files changed, 361 insertions(+), 21 deletions(-)
diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index c6c2139..3c2d09a 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -245,7 +245,18 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
+ <!--
+ NOTE: use `mockito-inline` here instead of `mockito-core`, as mockito-core does not support mocking static function:
+ ```
+ org.mockito.exceptions.base.MockitoException:
+ The used MockMaker SubclassByteBuddyMockMaker does not support the creation of static mocks
+
+ Mockito's inline mock maker supports static mocks based on the Instrumentation API.
+ You can simply enable this mock mode, by placing the 'mockito-inline' artifact where you are currently using 'mockito-core'.
+ Note that Mockito's inline mock maker is not supported on Android.
+ ```
+ -->
+ <artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 551b63a..9305d16 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -438,10 +438,11 @@ public class FileUploadDownloadClient implements Closeable {
return requestBuilder.build();
}
- private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs, String authToken) {
+ private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs, String authToken,
+ List<Header> httpHeaders) {
RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
if (StringUtils.isNotBlank(authToken)) {
- requestBuilder.addHeader("Authorization", authToken);
+ requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, authToken);
}
setTimeout(requestBuilder, socketTimeoutMs);
String userInfo = uri.getUserInfo();
@@ -450,6 +451,11 @@ public class FileUploadDownloadClient implements Closeable {
String authHeader = "Basic " + encoded;
requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, authHeader);
}
+ if (httpHeaders != null && !httpHeaders.isEmpty()) {
+ for (Header header : httpHeaders) {
+ requestBuilder.addHeader(header);
+ }
+ }
return requestBuilder.build();
}
@@ -819,10 +825,8 @@ public class FileUploadDownloadClient implements Closeable {
SimpleHttpResponse response = sendRequest(requestBuilder.build());
String downloadUrl = response.getResponse();
if (downloadUrl.isEmpty()) {
- throw new HttpErrorStatusException(
- String.format(
- "Returned segment download url is empty after requesting servers to upload by the path: %s",
- uri),
+ throw new HttpErrorStatusException(String
+ .format("Returned segment download url is empty after requesting servers to upload by the path: %s", uri),
response.getStatusCode());
}
return downloadUrl;
@@ -978,7 +982,7 @@ public class FileUploadDownloadClient implements Closeable {
*
* Download a file using default settings, with an optional auth token
*
- * @see FileUploadDownloadClient#downloadFile(URI, int, File, String)
+ * @see FileUploadDownloadClient#downloadFile(URI, int, File, String, List)
*
* @param uri URI
* @param socketTimeoutMs Socket timeout in milliseconds
@@ -990,7 +994,7 @@ public class FileUploadDownloadClient implements Closeable {
@Deprecated
public int downloadFile(URI uri, int socketTimeoutMs, File dest)
throws IOException, HttpErrorStatusException {
- return downloadFile(uri, socketTimeoutMs, dest, null);
+ return downloadFile(uri, socketTimeoutMs, dest, null, null);
}
/**
@@ -1000,13 +1004,14 @@ public class FileUploadDownloadClient implements Closeable {
* @param socketTimeoutMs Socket timeout in milliseconds
* @param dest File destination
* @param authToken auth token
+ * @param httpHeaders http headers
* @return Response status code
* @throws IOException
* @throws HttpErrorStatusException
*/
- public int downloadFile(URI uri, int socketTimeoutMs, File dest, String authToken)
+ public int downloadFile(URI uri, int socketTimeoutMs, File dest, String authToken, List<Header> httpHeaders)
throws IOException, HttpErrorStatusException {
- HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs, authToken);
+ HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs, authToken, httpHeaders);
try (CloseableHttpResponse response = _httpClient.execute(request)) {
StatusLine statusLine = response.getStatusLine();
int statusCode = statusLine.getStatusCode();
@@ -1064,7 +1069,23 @@ public class FileUploadDownloadClient implements Closeable {
*/
public int downloadFile(URI uri, File dest, String authToken)
throws IOException, HttpErrorStatusException {
- return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest, authToken);
+ return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest, null, null);
+ }
+
+ /**
+ * Download a file.
+ *
+ * @param uri URI
+ * @param dest File destination
+ * @param authToken auth token
+ * @param httpHeaders http headers
+ * @return Response status code
+ * @throws IOException
+ * @throws HttpErrorStatusException
+ */
+ public int downloadFile(URI uri, File dest, String authToken, List<Header> httpHeaders)
+ throws IOException, HttpErrorStatusException {
+ return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest, authToken, httpHeaders);
}
@Override
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java
new file mode 100644
index 0000000..36f837e
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.net.InetAddresses;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import org.apache.http.client.utils.URIBuilder;
+
+
+/**
+ * RoundRobinURIProvider accept a URI, try to resolve it into multiple URIs with IP address, and return a IP address URI
+ * in a Round Robin way.
+ */
+public class RoundRobinURIProvider {
+
+ private final URI[] _uris;
+ private int _index = 0;
+
+ public RoundRobinURIProvider(URI originalUri)
+ throws UnknownHostException, URISyntaxException {
+ String hostName = originalUri.getHost();
+ if (InetAddresses.isInetAddress(hostName)) {
+ _uris = new URI[]{originalUri};
+ } else {
+ // Resolve host name to IP addresses via DNS
+ InetAddress[] addresses = InetAddress.getAllByName(hostName);
+ _uris = new URI[addresses.length];
+ URIBuilder uriBuilder = new URIBuilder(originalUri);
+ for (int i = 0; i < addresses.length; i++) {
+ String ip = addresses[i].getHostAddress();
+ _uris[i] = uriBuilder.setHost(ip).build();
+ }
+ }
+ }
+
+ public int numAddresses() {
+ return _uris.length;
+ }
+
+ public URI next() {
+ URI result = _uris[_index];
+ _index = (_index + 1) % _uris.length;
+ return result;
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
index 7356e7b..1c8c928 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
@@ -18,10 +18,18 @@
*/
package org.apache.pinot.common.utils.fetcher;
+import com.google.common.net.InetAddresses;
import java.io.File;
import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.http.Header;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpStatus;
+import org.apache.http.message.BasicHeader;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.RoundRobinURIProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -35,19 +43,37 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
}
@Override
- public void fetchSegmentToLocal(URI uri, File dest)
+ public void fetchSegmentToLocal(URI downloadURI, File dest)
throws Exception {
- RetryPolicies.exponentialBackoffRetryPolicy(_retryCount, _retryWaitMs, _retryDelayScaleFactor).attempt(() -> {
+ // Create a RoundRobinURIProvider to round robin IP addresses when retry uploading. Otherwise may always try to
+ // download from a same broken host as: 1) DNS may not RR the IP addresses 2) OS cache the DNS resolution result.
+ RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(downloadURI);
+ int retryCount = Math.max(_retryCount, uriProvider.numAddresses());
+ _logger.info("Retry downloading for {} times. retryCount from pinot server config: {}, number of IP addresses for "
+ + "download URI: {}", retryCount, _retryCount, uriProvider.numAddresses());
+ RetryPolicies.exponentialBackoffRetryPolicy(retryCount, _retryWaitMs, _retryDelayScaleFactor).attempt(() -> {
+ URI uri = uriProvider.next();
try {
- int statusCode = _httpClient.downloadFile(uri, dest, _authToken);
+ String hostName = downloadURI.getHost();
+ int port = downloadURI.getPort();
+ // If the original download address is specified as host name, need add a "HOST" HTTP header to the HTTP
+ // request. Otherwise, if the download address is a LB address, when the LB be configured as "disallow direct
+ // access by IP address", downloading will fail.
+ List<Header> httpHeaders = new LinkedList<>();
+ if (!InetAddresses.isInetAddress(hostName)) {
+ httpHeaders.add(new BasicHeader(HttpHeaders.HOST, hostName + ":" + port));
+ }
+ int statusCode = _httpClient.downloadFile(uri, dest, _authToken, httpHeaders);
_logger
.info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(),
statusCode);
return true;
} catch (HttpErrorStatusException e) {
int statusCode = e.getStatusCode();
- if (statusCode >= 500) {
+ if (statusCode == HttpStatus.SC_NOT_FOUND || statusCode >= 500) {
// Temporary exception
+ // 404 is treated as a temporary exception, as the downloadURI may be backed by multiple hosts,
+ // if singe host is down, can retry with another host.
_logger.warn("Got temporary error status code: {} while downloading segment from: {} to: {}", statusCode, uri,
dest, e);
return false;
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java
new file mode 100644
index 0000000..edd1c90
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.net.InetAddresses;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class RoundRobinURIProviderTest {
+
+ @Test
+ public void testHostAddressRoundRobin()
+ throws URISyntaxException, UnknownHostException {
+
+ InetAddress[] testWebAddresses = new InetAddress[]{
+ InetAddress.getByAddress("testweb.com", InetAddresses.forString("192.168.3.1").getAddress()),
+ InetAddress.getByAddress("testweb.com", InetAddresses.forString("192.168.3.2").getAddress()),
+ InetAddress.getByAddress("testweb.com", InetAddresses.forString("192.168.3.3").getAddress())
+ };
+ InetAddress[] localHostAddresses = new InetAddress[]{
+ InetAddress.getByAddress("localhost", InetAddresses.forString("127.0.0.1").getAddress()),
+ InetAddress.getByAddress("localhost", InetAddresses.forString("0:0:0:0:0:0:0:1").getAddress())
+ };
+
+ MockedStatic<InetAddress> mock = Mockito.mockStatic(InetAddress.class);
+ mock.when(() -> InetAddress.getAllByName("localhost")).thenReturn(localHostAddresses);
+ mock.when(() -> InetAddress.getAllByName("testweb.com")).thenReturn(testWebAddresses);
+
+ TestCase[] testCases = new TestCase[]{
+ new TestCase("http://127.0.0.1", new String[]{"http://127.0.0.1"}),
+ new TestCase("http://127.0.0.1/", new String[]{"http://127.0.0.1/"}),
+ new TestCase("http://127.0.0.1/?", new String[]{"http://127.0.0.1/?"}),
+ new TestCase("http://127.0.0.1/?it=5", new String[]{"http://127.0.0.1/?it=5"}),
+ new TestCase("http://127.0.0.1/me/out?it=5", new String[]{"http://127.0.0.1/me/out?it=5"}),
+ new TestCase("http://127.0.0.1:20000", new String[]{"http://127.0.0.1:20000"}),
+ new TestCase("http://127.0.0.1:20000/", new String[]{"http://127.0.0.1:20000/"}),
+ new TestCase("http://127.0.0.1:20000/?", new String[]{"http://127.0.0.1:20000/?"}),
+ new TestCase("http://127.0.0.1:20000/?it=5", new String[]{"http://127.0.0.1:20000/?it=5"}),
+ new TestCase("http://127.0.0.1:20000/me/out?it=5", new String[]{"http://127.0.0.1:20000/me/out?it=5"}),
+
+ new TestCase("http://localhost", new String[]{"http://127.0.0.1", "http://[0:0:0:0:0:0:0:1]"}),
+ new TestCase("http://localhost/", new String[]{"http://127.0.0.1/", "http://[0:0:0:0:0:0:0:1]/"}),
+ new TestCase("http://localhost/?", new String[]{"http://127.0.0.1/?", "http://[0:0:0:0:0:0:0:1]/?"}),
+ new TestCase("http://localhost/?it=5",
+ new String[]{"http://127.0.0.1/?it=5", "http://[0:0:0:0:0:0:0:1]/?it=5"}),
+ new TestCase("http://localhost/me/out?it=5",
+ new String[]{"http://127.0.0.1/me/out?it=5", "http://[0:0:0:0:0:0:0:1]/me/out?it=5"}),
+ new TestCase("http://localhost:20000",
+ new String[]{"http://127.0.0.1:20000", "http://[0:0:0:0:0:0:0:1]:20000"}),
+ new TestCase("http://localhost:20000/",
+ new String[]{"http://127.0.0.1:20000/", "http://[0:0:0:0:0:0:0:1]:20000/"}),
+ new TestCase("http://localhost:20000/?",
+ new String[]{"http://127.0.0.1:20000/?", "http://[0:0:0:0:0:0:0:1]:20000/?"}),
+ new TestCase("http://localhost:20000/?it=5",
+ new String[]{"http://127.0.0.1:20000/?it=5", "http://[0:0:0:0:0:0:0:1]:20000/?it=5"}),
+ new TestCase("http://localhost:20000/me/out?it=5",
+ new String[]{"http://127.0.0.1:20000/me/out?it=5", "http://[0:0:0:0:0:0:0:1]:20000/me/out?it=5"}),
+
+ new TestCase("http://testweb.com",
+ new String[]{"http://192.168.3.1", "http://192.168.3.2", "http://192.168.3.3"}),
+ new TestCase("http://testweb.com/",
+ new String[]{"http://192.168.3.1/", "http://192.168.3.2/", "http://192.168.3.3/"}),
+ new TestCase("http://testweb.com/?",
+ new String[]{"http://192.168.3.1/?", "http://192.168.3.2/?", "http://192.168.3.3/?"}),
+ new TestCase("http://testweb.com/?it=5",
+ new String[]{"http://192.168.3.1/?it=5", "http://192.168.3.2/?it=5", "http://192.168.3.3/?it=5"}),
+ new TestCase("http://testweb.com/me/out?it=5",
+ new String[]{"http://192.168.3.1/me/out?it=5", "http://192.168.3.2/me/out?it=5",
+ "http://192.168.3.3/me/out?it=5"}),
+ new TestCase("http://testweb.com:20000",
+ new String[]{"http://192.168.3.1:20000", "http://192.168.3.2:20000", "http://192.168.3.3:20000"}),
+ new TestCase("http://testweb.com:20000/",
+ new String[]{"http://192.168.3.1:20000/", "http://192.168.3.2:20000/", "http://192.168.3.3:20000/"}),
+ new TestCase("http://testweb.com:20000/?",
+ new String[]{"http://192.168.3.1:20000/?", "http://192.168.3.2:20000/?", "http://192.168.3.3:20000/?"}),
+ new TestCase("http://testweb.com:20000/?it=5",
+ new String[]{"http://192.168.3.1:20000/?it=5", "http://192.168.3.2:20000/?it=5",
+ "http://192.168.3.3:20000/?it=5"}),
+ new TestCase("http://testweb.com:20000/me/out?it=5",
+ new String[]{"http://192.168.3.1:20000/me/out?it=5", "http://192.168.3.2:20000/me/out?it=5",
+ "http://192.168.3.3:20000/me/out?it=5"}),
+
+ new TestCase("https://127.0.0.1", new String[]{"https://127.0.0.1"}),
+ new TestCase("https://127.0.0.1/", new String[]{"https://127.0.0.1/"}),
+ new TestCase("https://127.0.0.1/?", new String[]{"https://127.0.0.1/?"}),
+ new TestCase("https://127.0.0.1/?it=5", new String[]{"https://127.0.0.1/?it=5"}),
+ new TestCase("https://127.0.0.1/me/out?it=5", new String[]{"https://127.0.0.1/me/out?it=5"}),
+ new TestCase("https://127.0.0.1:20000", new String[]{"https://127.0.0.1:20000"}),
+ new TestCase("https://127.0.0.1:20000/", new String[]{"https://127.0.0.1:20000/"}),
+ new TestCase("https://127.0.0.1:20000/?", new String[]{"https://127.0.0.1:20000/?"}),
+ new TestCase("https://127.0.0.1:20000/?it=5", new String[]{"https://127.0.0.1:20000/?it=5"}),
+ new TestCase("https://127.0.0.1:20000/me/out?it=5",
+ new String[]{"https://127.0.0.1:20000/me/out?it=5"}),
+
+ new TestCase("https://localhost", new String[]{"https://127.0.0.1", "https://[0:0:0:0:0:0:0:1]"}),
+ new TestCase("https://localhost/", new String[]{"https://127.0.0.1/", "https://[0:0:0:0:0:0:0:1]/"}),
+ new TestCase("https://localhost/?", new String[]{"https://127.0.0.1/?", "https://[0:0:0:0:0:0:0:1]/?"}),
+ new TestCase("https://localhost/?it=5",
+ new String[]{"https://127.0.0.1/?it=5", "https://[0:0:0:0:0:0:0:1]/?it=5"}),
+ new TestCase("https://localhost/me/out?it=5",
+ new String[]{"https://127.0.0.1/me/out?it=5", "https://[0:0:0:0:0:0:0:1]/me/out?it=5"}),
+ new TestCase("https://localhost:20000",
+ new String[]{"https://127.0.0.1:20000", "https://[0:0:0:0:0:0:0:1]:20000"}),
+ new TestCase("https://localhost:20000/",
+ new String[]{"https://127.0.0.1:20000/", "https://[0:0:0:0:0:0:0:1]:20000/"}),
+ new TestCase("https://localhost:20000/?",
+ new String[]{"https://127.0.0.1:20000/?", "https://[0:0:0:0:0:0:0:1]:20000/?"}),
+ new TestCase("https://localhost:20000/?it=5",
+ new String[]{"https://127.0.0.1:20000/?it=5", "https://[0:0:0:0:0:0:0:1]:20000/?it=5"}),
+
+ new TestCase("https://testweb.com",
+ new String[]{"https://192.168.3.1", "https://192.168.3.2", "https://192.168.3.3"}),
+ new TestCase("https://testweb.com/",
+ new String[]{"https://192.168.3.1/", "https://192.168.3.2/", "https://192.168.3.3/"}),
+ new TestCase("https://testweb.com/?",
+ new String[]{"https://192.168.3.1/?", "https://192.168.3.2/?", "https://192.168.3.3/?"}),
+ new TestCase("https://testweb.com/?it=5",
+ new String[]{"https://192.168.3.1/?it=5", "https://192.168.3.2/?it=5", "https://192.168.3.3/?it=5"}),
+ new TestCase("https://testweb.com/me/out?it=5",
+ new String[]{"https://192.168.3.1/me/out?it=5", "https://192.168.3.2/me/out?it=5",
+ "https://192.168.3.3/me/out?it=5"}),
+ new TestCase("https://testweb.com:20000",
+ new String[]{"https://192.168.3.1:20000", "https://192.168.3.2:20000", "https://192.168.3.3:20000"}),
+ new TestCase("https://testweb.com:20000/",
+ new String[]{"https://192.168.3.1:20000/", "https://192.168.3.2:20000/", "https://192.168.3.3:20000/"}),
+ new TestCase("https://testweb.com:20000/?",
+ new String[]{"https://192.168.3.1:20000/?", "https://192.168.3.2:20000/?", "https://192.168.3.3:20000/?"}),
+ new TestCase("https://testweb.com:20000/?it=5",
+ new String[]{"https://192.168.3.1:20000/?it=5", "https://192.168.3.2:20000/?it=5",
+ "https://192.168.3.3:20000/?it=5"}),
+ new TestCase("https://testweb.com:20000/me/out?it=5",
+ new String[]{"https://192.168.3.1:20000/me/out?it=5", "https://192.168.3.2:20000/me/out?it=5",
+ "https://192.168.3.3:20000/me/out?it=5"}),
+ };
+
+ for (TestCase testCase : testCases) {
+ String uri = testCase._originalUri;
+ RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(new URI(uri));
+ int n = testCase._expectedUris.length;
+ for (int i = 0; i < 2 * n; i++) {
+ String expectedUri = testCase._expectedUris[i % n];
+ Assert.assertEquals(uriProvider.next().toString(), expectedUri);
+ }
+ }
+ }
+
+ static class TestCase {
+ String _originalUri;
+ String[] _expectedUris;
+
+ TestCase(String originalUri, String[] expectedUris) {
+ _originalUri = originalUri;
+ _expectedUris = expectedUris;
+ }
+ }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthTlsRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthTlsRealtimeIntegrationTest.java
index de222cb..f04f1e1 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthTlsRealtimeIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthTlsRealtimeIntegrationTest.java
@@ -258,6 +258,18 @@ public class BasicAuthTlsRealtimeIntegrationTest extends BaseClusterIntegrationT
void prepareTlsStore()
throws Exception {
try (OutputStream os = new FileOutputStream(_tlsStore);
+ /*
+ * Command to generate the tlstest.jks file (generate key pairs for both IPV4 and IPV6 addresses):
+ * ```
+ * keytool -genkeypair -keystore tlstest.jks -dname "CN=test, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, \
+ * C=Unknown" -keypass changeit -storepass changeit -keyalg RSA -alias localhost-ipv4 -ext \
+ * SAN=dns:localhost,ip:127.0.0.1
+ *
+ * keytool -genkeypair -keystore tlstest.jks -dname "CN=test, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, \
+ * C=Unknown" -keypass changeit -storepass changeit -keyalg RSA -alias localhost-ipv6 -ext \
+ * SAN=dns:localhost,ip:0:0:0:0:0:0:0:1
+ * ```
+ */
InputStream is = getClass().getResourceAsStream("/tlstest.jks")) {
Preconditions.checkNotNull(is, "tlstest.jks must be on the classpath");
IOUtils.copy(is, os);
diff --git a/pinot-integration-tests/src/test/resources/tlstest.jks b/pinot-integration-tests/src/test/resources/tlstest.jks
index 9609d94..478cabf 100644
Binary files a/pinot-integration-tests/src/test/resources/tlstest.jks and b/pinot-integration-tests/src/test/resources/tlstest.jks differ
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
index 53efe14..6c60aed 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
@@ -18,17 +18,21 @@
*/
package org.apache.pinot.plugin.minion.tasks;
+import com.google.common.net.InetAddresses;
import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.Map;
import javax.net.ssl.SSLContext;
import org.apache.http.Header;
+import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
+import org.apache.http.message.BasicHeader;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.RoundRobinURIProvider;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.minion.MinionContext;
@@ -57,10 +61,16 @@ public class SegmentConversionUtils {
public static void uploadSegment(Map<String, String> configs, List<Header> httpHeaders,
List<NameValuePair> parameters, String tableNameWithType, String segmentName, String uploadURL, File fileToUpload)
throws Exception {
+ // Create a RoundRobinURIProvider to round robin IP addresses when retry uploading. Otherwise may always try to
+ // upload to a same broken host as: 1) DNS may not RR the IP addresses 2) OS cache the DNS resolution result.
+ RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(new URI(uploadURL));
// Generate retry policy based on the config
- String maxNumAttemptsConfig = configs.get(MinionConstants.MAX_NUM_ATTEMPTS_KEY);
- int maxNumAttempts =
- maxNumAttemptsConfig != null ? Integer.parseInt(maxNumAttemptsConfig) : DEFAULT_MAX_NUM_ATTEMPTS;
+ String maxNumAttemptsConfigStr = configs.get(MinionConstants.MAX_NUM_ATTEMPTS_KEY);
+ int maxNumAttemptsFromConfig =
+ maxNumAttemptsConfigStr != null ? Integer.parseInt(maxNumAttemptsConfigStr) : DEFAULT_MAX_NUM_ATTEMPTS;
+ int maxNumAttempts = Math.max(maxNumAttemptsFromConfig, uriProvider.numAddresses());
+ LOGGER.info("Retry uploading for {} times. Max num attempts from pinot minion config: {}, number of IP addresses "
+ + "for upload URI: {}", maxNumAttempts, maxNumAttemptsFromConfig, uriProvider.numAddresses());
String initialRetryDelayMsConfig = configs.get(MinionConstants.INITIAL_RETRY_DELAY_MS_KEY);
long initialRetryDelayMs =
initialRetryDelayMsConfig != null ? Long.parseLong(initialRetryDelayMsConfig) : DEFAULT_INITIAL_RETRY_DELAY_MS;
@@ -74,17 +84,28 @@ public class SegmentConversionUtils {
SSLContext sslContext = MinionContext.getInstance().getSSLContext();
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
retryPolicy.attempt(() -> {
+ URI uri = uriProvider.next();
+ String hostName = new URI(uploadURL).getHost();
+ int hostPort = new URI(uploadURL).getPort();
+ // If the original upload address is specified as host name, need add a "HOST" HTTP header to the HTTP
+ // request. Otherwise, if the upload address is a LB address, when the LB be configured as "disallow direct
+ // access by IP address", upload will fail.
+ if (!InetAddresses.isInetAddress(hostName)) {
+ httpHeaders.add(new BasicHeader(HttpHeaders.HOST, hostName + ":" + hostPort));
+ }
try {
SimpleHttpResponse response = fileUploadDownloadClient
- .uploadSegment(new URI(uploadURL), segmentName, fileToUpload, httpHeaders, parameters,
+ .uploadSegment(uri, segmentName, fileToUpload, httpHeaders, parameters,
FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
LOGGER.info("Got response {}: {} while uploading table: {}, segment: {} with uploadURL: {}",
response.getStatusCode(), response.getResponse(), tableNameWithType, segmentName, uploadURL);
return true;
} catch (HttpErrorStatusException e) {
int statusCode = e.getStatusCode();
- if (statusCode == HttpStatus.SC_CONFLICT || statusCode >= 500) {
+ if (statusCode == HttpStatus.SC_CONFLICT || statusCode == HttpStatus.SC_NOT_FOUND || statusCode >= 500) {
// Temporary exception
+ // 404 is treated as a temporary exception, as the uploadURL may be backed by multiple hosts,
+ // if singe host is down, can retry with another host.
LOGGER.warn("Caught temporary exception while uploading segment: {}, will retry", segmentName, e);
return false;
} else {
diff --git a/pom.xml b/pom.xml
index 168ec2a..6a69ab9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1256,6 +1256,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <version>3.9.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.0</version>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org