You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/10/27 00:49:38 UTC

[pinot] branch master updated: Round Robin IP addresses when retry uploading/downloading segments (#7585)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0166135  Round Robin IP addresses when retry uploading/downloading segments (#7585)
0166135 is described below

commit 0166135772eb165e52900db558490e77eae767fb
Author: Liang Mingqiang <mi...@linkedin.com>
AuthorDate: Tue Oct 26 17:49:24 2021 -0700

    Round Robin IP addresses when retry uploading/downloading segments (#7585)
    
    * Round Robin IP addresses when retry upload/download segments
    
    * Add unit test and reset retry count
    
    * add header
    
    * fix integration test by updating tlstest.jks
    
    * Tolerate LB disallowing direct access by IP address issue
    
    * Add comments about mockito-inline to pom file
---
 pinot-common/pom.xml                               |  13 +-
 .../common/utils/FileUploadDownloadClient.java     |  43 +++--
 .../pinot/common/utils/RoundRobinURIProvider.java  |  64 ++++++++
 .../common/utils/fetcher/HttpSegmentFetcher.java   |  34 +++-
 .../common/utils/RoundRobinURIProviderTest.java    | 179 +++++++++++++++++++++
 .../tests/BasicAuthTlsRealtimeIntegrationTest.java |  12 ++
 .../src/test/resources/tlstest.jks                 | Bin 4271 -> 9123 bytes
 .../minion/tasks/SegmentConversionUtils.java       |  31 +++-
 pom.xml                                            |   6 +
 9 files changed, 361 insertions(+), 21 deletions(-)

diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index c6c2139..3c2d09a 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -245,7 +245,18 @@
     </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
+      <!--
+      NOTE: use `mockito-inline` here instead of `mockito-core`, as mockito-core does not support mocking static function:
+      ```
+      org.mockito.exceptions.base.MockitoException:
+      The used MockMaker SubclassByteBuddyMockMaker does not support the creation of static mocks
+
+      Mockito's inline mock maker supports static mocks based on the Instrumentation API.
+      You can simply enable this mock mode, by placing the 'mockito-inline' artifact where you are currently using 'mockito-core'.
+      Note that Mockito's inline mock maker is not supported on Android.
+      ```
+      -->
+      <artifactId>mockito-inline</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 551b63a..9305d16 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -438,10 +438,11 @@ public class FileUploadDownloadClient implements Closeable {
     return requestBuilder.build();
   }
 
-  private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs, String authToken) {
+  private static HttpUriRequest getDownloadFileRequest(URI uri, int socketTimeoutMs, String authToken,
+      List<Header> httpHeaders) {
     RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
     if (StringUtils.isNotBlank(authToken)) {
-      requestBuilder.addHeader("Authorization", authToken);
+      requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, authToken);
     }
     setTimeout(requestBuilder, socketTimeoutMs);
     String userInfo = uri.getUserInfo();
@@ -450,6 +451,11 @@ public class FileUploadDownloadClient implements Closeable {
       String authHeader = "Basic " + encoded;
       requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, authHeader);
     }
+    if (httpHeaders != null && !httpHeaders.isEmpty()) {
+      for (Header header : httpHeaders) {
+        requestBuilder.addHeader(header);
+      }
+    }
     return requestBuilder.build();
   }
 
@@ -819,10 +825,8 @@ public class FileUploadDownloadClient implements Closeable {
     SimpleHttpResponse response = sendRequest(requestBuilder.build());
     String downloadUrl = response.getResponse();
     if (downloadUrl.isEmpty()) {
-      throw new HttpErrorStatusException(
-          String.format(
-              "Returned segment download url is empty after requesting servers to upload by the path: %s",
-              uri),
+      throw new HttpErrorStatusException(String
+          .format("Returned segment download url is empty after requesting servers to upload by the path: %s", uri),
           response.getStatusCode());
     }
     return downloadUrl;
@@ -978,7 +982,7 @@ public class FileUploadDownloadClient implements Closeable {
    *
    * Download a file using default settings, with an optional auth token
    *
-   * @see FileUploadDownloadClient#downloadFile(URI, int, File, String)
+   * @see FileUploadDownloadClient#downloadFile(URI, int, File, String, List)
    *
    * @param uri URI
    * @param socketTimeoutMs Socket timeout in milliseconds
@@ -990,7 +994,7 @@ public class FileUploadDownloadClient implements Closeable {
   @Deprecated
   public int downloadFile(URI uri, int socketTimeoutMs, File dest)
       throws IOException, HttpErrorStatusException {
-    return downloadFile(uri, socketTimeoutMs, dest, null);
+    return downloadFile(uri, socketTimeoutMs, dest, null, null);
   }
 
   /**
@@ -1000,13 +1004,14 @@ public class FileUploadDownloadClient implements Closeable {
    * @param socketTimeoutMs Socket timeout in milliseconds
    * @param dest File destination
    * @param authToken auth token
+   * @param httpHeaders http headers
    * @return Response status code
    * @throws IOException
    * @throws HttpErrorStatusException
    */
-  public int downloadFile(URI uri, int socketTimeoutMs, File dest, String authToken)
+  public int downloadFile(URI uri, int socketTimeoutMs, File dest, String authToken, List<Header> httpHeaders)
       throws IOException, HttpErrorStatusException {
-    HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs, authToken);
+    HttpUriRequest request = getDownloadFileRequest(uri, socketTimeoutMs, authToken, httpHeaders);
     try (CloseableHttpResponse response = _httpClient.execute(request)) {
       StatusLine statusLine = response.getStatusLine();
       int statusCode = statusLine.getStatusCode();
@@ -1064,7 +1069,23 @@ public class FileUploadDownloadClient implements Closeable {
    */
   public int downloadFile(URI uri, File dest, String authToken)
       throws IOException, HttpErrorStatusException {
-    return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest, authToken);
+    return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest, null, null);
+  }
+
+  /**
+   * Download a file.
+   *
+   * @param uri URI
+   * @param dest File destination
+   * @param authToken auth token
+   * @param httpHeaders http headers
+   * @return Response status code
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public int downloadFile(URI uri, File dest, String authToken, List<Header> httpHeaders)
+      throws IOException, HttpErrorStatusException {
+    return downloadFile(uri, DEFAULT_SOCKET_TIMEOUT_MS, dest, authToken, httpHeaders);
   }
 
   @Override
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java
new file mode 100644
index 0000000..36f837e
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.net.InetAddresses;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import org.apache.http.client.utils.URIBuilder;
+
+
+/**
+ * RoundRobinURIProvider accept a URI, try to resolve it into multiple URIs with IP address, and return a IP address URI
+ * in a Round Robin way.
+ */
+public class RoundRobinURIProvider {
+
+  private final URI[] _uris;
+  private int _index = 0;
+
+  public RoundRobinURIProvider(URI originalUri)
+      throws UnknownHostException, URISyntaxException {
+    String hostName = originalUri.getHost();
+    if (InetAddresses.isInetAddress(hostName)) {
+      _uris = new URI[]{originalUri};
+    } else {
+      // Resolve host name to IP addresses via DNS
+      InetAddress[] addresses = InetAddress.getAllByName(hostName);
+      _uris = new URI[addresses.length];
+      URIBuilder uriBuilder = new URIBuilder(originalUri);
+      for (int i = 0; i < addresses.length; i++) {
+        String ip = addresses[i].getHostAddress();
+        _uris[i] = uriBuilder.setHost(ip).build();
+      }
+    }
+  }
+
+  public int numAddresses() {
+    return _uris.length;
+  }
+
+  public URI next() {
+    URI result = _uris[_index];
+    _index = (_index + 1) % _uris.length;
+    return result;
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
index 7356e7b..1c8c928 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
@@ -18,10 +18,18 @@
  */
 package org.apache.pinot.common.utils.fetcher;
 
+import com.google.common.net.InetAddresses;
 import java.io.File;
 import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.http.Header;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpStatus;
+import org.apache.http.message.BasicHeader;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.RoundRobinURIProvider;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
 
@@ -35,19 +43,37 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
   }
 
   @Override
-  public void fetchSegmentToLocal(URI uri, File dest)
+  public void fetchSegmentToLocal(URI downloadURI, File dest)
       throws Exception {
-    RetryPolicies.exponentialBackoffRetryPolicy(_retryCount, _retryWaitMs, _retryDelayScaleFactor).attempt(() -> {
+    // Create a RoundRobinURIProvider to round robin IP addresses when retry uploading. Otherwise may always try to
+    // download from a same broken host as: 1) DNS may not RR the IP addresses 2) OS cache the DNS resolution result.
+    RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(downloadURI);
+    int retryCount = Math.max(_retryCount, uriProvider.numAddresses());
+    _logger.info("Retry downloading for {} times. retryCount from pinot server config: {}, number of IP addresses for "
+        + "download URI: {}", retryCount, _retryCount, uriProvider.numAddresses());
+    RetryPolicies.exponentialBackoffRetryPolicy(retryCount, _retryWaitMs, _retryDelayScaleFactor).attempt(() -> {
+      URI uri = uriProvider.next();
       try {
-        int statusCode = _httpClient.downloadFile(uri, dest, _authToken);
+        String hostName = downloadURI.getHost();
+        int port = downloadURI.getPort();
+        // If the original download address is specified as host name, need add a "HOST" HTTP header to the HTTP
+        // request. Otherwise, if the download address is a LB address, when the LB be configured as "disallow direct
+        // access by IP address", downloading will fail.
+        List<Header> httpHeaders = new LinkedList<>();
+        if (!InetAddresses.isInetAddress(hostName)) {
+          httpHeaders.add(new BasicHeader(HttpHeaders.HOST, hostName + ":" + port));
+        }
+        int statusCode = _httpClient.downloadFile(uri, dest, _authToken, httpHeaders);
         _logger
             .info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(),
                 statusCode);
         return true;
       } catch (HttpErrorStatusException e) {
         int statusCode = e.getStatusCode();
-        if (statusCode >= 500) {
+        if (statusCode == HttpStatus.SC_NOT_FOUND || statusCode >= 500) {
           // Temporary exception
+          // 404 is treated as a temporary exception, as the downloadURI may be backed by multiple hosts,
+          // if singe host is down, can retry with another host.
           _logger.warn("Got temporary error status code: {} while downloading segment from: {} to: {}", statusCode, uri,
               dest, e);
           return false;
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java
new file mode 100644
index 0000000..edd1c90
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.net.InetAddresses;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class RoundRobinURIProviderTest {
+
+  @Test
+  public void testHostAddressRoundRobin()
+      throws URISyntaxException, UnknownHostException {
+
+    InetAddress[] testWebAddresses = new InetAddress[]{
+        InetAddress.getByAddress("testweb.com", InetAddresses.forString("192.168.3.1").getAddress()),
+        InetAddress.getByAddress("testweb.com", InetAddresses.forString("192.168.3.2").getAddress()),
+        InetAddress.getByAddress("testweb.com", InetAddresses.forString("192.168.3.3").getAddress())
+    };
+    InetAddress[] localHostAddresses = new InetAddress[]{
+        InetAddress.getByAddress("localhost", InetAddresses.forString("127.0.0.1").getAddress()),
+        InetAddress.getByAddress("localhost", InetAddresses.forString("0:0:0:0:0:0:0:1").getAddress())
+    };
+
+    MockedStatic<InetAddress> mock = Mockito.mockStatic(InetAddress.class);
+    mock.when(() -> InetAddress.getAllByName("localhost")).thenReturn(localHostAddresses);
+    mock.when(() -> InetAddress.getAllByName("testweb.com")).thenReturn(testWebAddresses);
+
+    TestCase[] testCases = new TestCase[]{
+        new TestCase("http://127.0.0.1", new String[]{"http://127.0.0.1"}),
+        new TestCase("http://127.0.0.1/", new String[]{"http://127.0.0.1/"}),
+        new TestCase("http://127.0.0.1/?", new String[]{"http://127.0.0.1/?"}),
+        new TestCase("http://127.0.0.1/?it=5", new String[]{"http://127.0.0.1/?it=5"}),
+        new TestCase("http://127.0.0.1/me/out?it=5", new String[]{"http://127.0.0.1/me/out?it=5"}),
+        new TestCase("http://127.0.0.1:20000", new String[]{"http://127.0.0.1:20000"}),
+        new TestCase("http://127.0.0.1:20000/", new String[]{"http://127.0.0.1:20000/"}),
+        new TestCase("http://127.0.0.1:20000/?", new String[]{"http://127.0.0.1:20000/?"}),
+        new TestCase("http://127.0.0.1:20000/?it=5", new String[]{"http://127.0.0.1:20000/?it=5"}),
+        new TestCase("http://127.0.0.1:20000/me/out?it=5", new String[]{"http://127.0.0.1:20000/me/out?it=5"}),
+
+        new TestCase("http://localhost", new String[]{"http://127.0.0.1", "http://[0:0:0:0:0:0:0:1]"}),
+        new TestCase("http://localhost/", new String[]{"http://127.0.0.1/", "http://[0:0:0:0:0:0:0:1]/"}),
+        new TestCase("http://localhost/?", new String[]{"http://127.0.0.1/?", "http://[0:0:0:0:0:0:0:1]/?"}),
+        new TestCase("http://localhost/?it=5",
+            new String[]{"http://127.0.0.1/?it=5", "http://[0:0:0:0:0:0:0:1]/?it=5"}),
+        new TestCase("http://localhost/me/out?it=5",
+            new String[]{"http://127.0.0.1/me/out?it=5", "http://[0:0:0:0:0:0:0:1]/me/out?it=5"}),
+        new TestCase("http://localhost:20000",
+            new String[]{"http://127.0.0.1:20000", "http://[0:0:0:0:0:0:0:1]:20000"}),
+        new TestCase("http://localhost:20000/",
+            new String[]{"http://127.0.0.1:20000/", "http://[0:0:0:0:0:0:0:1]:20000/"}),
+        new TestCase("http://localhost:20000/?",
+            new String[]{"http://127.0.0.1:20000/?", "http://[0:0:0:0:0:0:0:1]:20000/?"}),
+        new TestCase("http://localhost:20000/?it=5",
+            new String[]{"http://127.0.0.1:20000/?it=5", "http://[0:0:0:0:0:0:0:1]:20000/?it=5"}),
+        new TestCase("http://localhost:20000/me/out?it=5",
+            new String[]{"http://127.0.0.1:20000/me/out?it=5", "http://[0:0:0:0:0:0:0:1]:20000/me/out?it=5"}),
+
+        new TestCase("http://testweb.com",
+            new String[]{"http://192.168.3.1", "http://192.168.3.2", "http://192.168.3.3"}),
+        new TestCase("http://testweb.com/",
+            new String[]{"http://192.168.3.1/", "http://192.168.3.2/", "http://192.168.3.3/"}),
+        new TestCase("http://testweb.com/?",
+            new String[]{"http://192.168.3.1/?", "http://192.168.3.2/?", "http://192.168.3.3/?"}),
+        new TestCase("http://testweb.com/?it=5",
+            new String[]{"http://192.168.3.1/?it=5", "http://192.168.3.2/?it=5", "http://192.168.3.3/?it=5"}),
+        new TestCase("http://testweb.com/me/out?it=5",
+            new String[]{"http://192.168.3.1/me/out?it=5", "http://192.168.3.2/me/out?it=5",
+                "http://192.168.3.3/me/out?it=5"}),
+        new TestCase("http://testweb.com:20000",
+            new String[]{"http://192.168.3.1:20000", "http://192.168.3.2:20000", "http://192.168.3.3:20000"}),
+        new TestCase("http://testweb.com:20000/",
+            new String[]{"http://192.168.3.1:20000/", "http://192.168.3.2:20000/", "http://192.168.3.3:20000/"}),
+        new TestCase("http://testweb.com:20000/?",
+            new String[]{"http://192.168.3.1:20000/?", "http://192.168.3.2:20000/?", "http://192.168.3.3:20000/?"}),
+        new TestCase("http://testweb.com:20000/?it=5",
+            new String[]{"http://192.168.3.1:20000/?it=5", "http://192.168.3.2:20000/?it=5",
+                "http://192.168.3.3:20000/?it=5"}),
+        new TestCase("http://testweb.com:20000/me/out?it=5",
+            new String[]{"http://192.168.3.1:20000/me/out?it=5", "http://192.168.3.2:20000/me/out?it=5",
+                "http://192.168.3.3:20000/me/out?it=5"}),
+
+        new TestCase("https://127.0.0.1", new String[]{"https://127.0.0.1"}),
+        new TestCase("https://127.0.0.1/", new String[]{"https://127.0.0.1/"}),
+        new TestCase("https://127.0.0.1/?", new String[]{"https://127.0.0.1/?"}),
+        new TestCase("https://127.0.0.1/?it=5", new String[]{"https://127.0.0.1/?it=5"}),
+        new TestCase("https://127.0.0.1/me/out?it=5", new String[]{"https://127.0.0.1/me/out?it=5"}),
+        new TestCase("https://127.0.0.1:20000", new String[]{"https://127.0.0.1:20000"}),
+        new TestCase("https://127.0.0.1:20000/", new String[]{"https://127.0.0.1:20000/"}),
+        new TestCase("https://127.0.0.1:20000/?", new String[]{"https://127.0.0.1:20000/?"}),
+        new TestCase("https://127.0.0.1:20000/?it=5", new String[]{"https://127.0.0.1:20000/?it=5"}),
+        new TestCase("https://127.0.0.1:20000/me/out?it=5",
+            new String[]{"https://127.0.0.1:20000/me/out?it=5"}),
+
+        new TestCase("https://localhost", new String[]{"https://127.0.0.1", "https://[0:0:0:0:0:0:0:1]"}),
+        new TestCase("https://localhost/", new String[]{"https://127.0.0.1/", "https://[0:0:0:0:0:0:0:1]/"}),
+        new TestCase("https://localhost/?", new String[]{"https://127.0.0.1/?", "https://[0:0:0:0:0:0:0:1]/?"}),
+        new TestCase("https://localhost/?it=5",
+            new String[]{"https://127.0.0.1/?it=5", "https://[0:0:0:0:0:0:0:1]/?it=5"}),
+        new TestCase("https://localhost/me/out?it=5",
+            new String[]{"https://127.0.0.1/me/out?it=5", "https://[0:0:0:0:0:0:0:1]/me/out?it=5"}),
+        new TestCase("https://localhost:20000",
+            new String[]{"https://127.0.0.1:20000", "https://[0:0:0:0:0:0:0:1]:20000"}),
+        new TestCase("https://localhost:20000/",
+            new String[]{"https://127.0.0.1:20000/", "https://[0:0:0:0:0:0:0:1]:20000/"}),
+        new TestCase("https://localhost:20000/?",
+            new String[]{"https://127.0.0.1:20000/?", "https://[0:0:0:0:0:0:0:1]:20000/?"}),
+        new TestCase("https://localhost:20000/?it=5",
+            new String[]{"https://127.0.0.1:20000/?it=5", "https://[0:0:0:0:0:0:0:1]:20000/?it=5"}),
+
+        new TestCase("https://testweb.com",
+            new String[]{"https://192.168.3.1", "https://192.168.3.2", "https://192.168.3.3"}),
+        new TestCase("https://testweb.com/",
+            new String[]{"https://192.168.3.1/", "https://192.168.3.2/", "https://192.168.3.3/"}),
+        new TestCase("https://testweb.com/?",
+            new String[]{"https://192.168.3.1/?", "https://192.168.3.2/?", "https://192.168.3.3/?"}),
+        new TestCase("https://testweb.com/?it=5",
+            new String[]{"https://192.168.3.1/?it=5", "https://192.168.3.2/?it=5", "https://192.168.3.3/?it=5"}),
+        new TestCase("https://testweb.com/me/out?it=5",
+            new String[]{"https://192.168.3.1/me/out?it=5", "https://192.168.3.2/me/out?it=5",
+                "https://192.168.3.3/me/out?it=5"}),
+        new TestCase("https://testweb.com:20000",
+            new String[]{"https://192.168.3.1:20000", "https://192.168.3.2:20000", "https://192.168.3.3:20000"}),
+        new TestCase("https://testweb.com:20000/",
+            new String[]{"https://192.168.3.1:20000/", "https://192.168.3.2:20000/", "https://192.168.3.3:20000/"}),
+        new TestCase("https://testweb.com:20000/?",
+            new String[]{"https://192.168.3.1:20000/?", "https://192.168.3.2:20000/?", "https://192.168.3.3:20000/?"}),
+        new TestCase("https://testweb.com:20000/?it=5",
+            new String[]{"https://192.168.3.1:20000/?it=5", "https://192.168.3.2:20000/?it=5",
+                "https://192.168.3.3:20000/?it=5"}),
+        new TestCase("https://testweb.com:20000/me/out?it=5",
+            new String[]{"https://192.168.3.1:20000/me/out?it=5", "https://192.168.3.2:20000/me/out?it=5",
+                "https://192.168.3.3:20000/me/out?it=5"}),
+    };
+
+    for (TestCase testCase : testCases) {
+      String uri = testCase._originalUri;
+      RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(new URI(uri));
+      int n = testCase._expectedUris.length;
+      for (int i = 0; i < 2 * n; i++) {
+        String expectedUri = testCase._expectedUris[i % n];
+        Assert.assertEquals(uriProvider.next().toString(), expectedUri);
+      }
+    }
+  }
+
+  static class TestCase {
+    String _originalUri;
+    String[] _expectedUris;
+
+    TestCase(String originalUri, String[] expectedUris) {
+      _originalUri = originalUri;
+      _expectedUris = expectedUris;
+    }
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthTlsRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthTlsRealtimeIntegrationTest.java
index de222cb..f04f1e1 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthTlsRealtimeIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthTlsRealtimeIntegrationTest.java
@@ -258,6 +258,18 @@ public class BasicAuthTlsRealtimeIntegrationTest extends BaseClusterIntegrationT
   void prepareTlsStore()
       throws Exception {
     try (OutputStream os = new FileOutputStream(_tlsStore);
+        /*
+         * Command to generate the tlstest.jks file (generate key pairs for both IPV4 and IPV6 addresses):
+         * ```
+         *  keytool -genkeypair -keystore tlstest.jks -dname "CN=test, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, \
+         *  C=Unknown" -keypass changeit -storepass changeit -keyalg RSA -alias localhost-ipv4 -ext \
+         *  SAN=dns:localhost,ip:127.0.0.1
+         *
+         *  keytool -genkeypair -keystore tlstest.jks -dname "CN=test, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, \
+         *  C=Unknown" -keypass changeit -storepass changeit -keyalg RSA -alias localhost-ipv6 -ext \
+         *  SAN=dns:localhost,ip:0:0:0:0:0:0:0:1
+         * ```
+         */
         InputStream is = getClass().getResourceAsStream("/tlstest.jks")) {
       Preconditions.checkNotNull(is, "tlstest.jks must be on the classpath");
       IOUtils.copy(is, os);
diff --git a/pinot-integration-tests/src/test/resources/tlstest.jks b/pinot-integration-tests/src/test/resources/tlstest.jks
index 9609d94..478cabf 100644
Binary files a/pinot-integration-tests/src/test/resources/tlstest.jks and b/pinot-integration-tests/src/test/resources/tlstest.jks differ
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
index 53efe14..6c60aed 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
@@ -18,17 +18,21 @@
  */
 package org.apache.pinot.plugin.minion.tasks;
 
+import com.google.common.net.InetAddresses;
 import java.io.File;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import javax.net.ssl.SSLContext;
 import org.apache.http.Header;
+import org.apache.http.HttpHeaders;
 import org.apache.http.HttpStatus;
 import org.apache.http.NameValuePair;
+import org.apache.http.message.BasicHeader;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.RoundRobinURIProvider;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.minion.MinionContext;
@@ -57,10 +61,16 @@ public class SegmentConversionUtils {
   public static void uploadSegment(Map<String, String> configs, List<Header> httpHeaders,
       List<NameValuePair> parameters, String tableNameWithType, String segmentName, String uploadURL, File fileToUpload)
       throws Exception {
+    // Create a RoundRobinURIProvider to round robin IP addresses when retry uploading. Otherwise may always try to
+    // upload to a same broken host as: 1) DNS may not RR the IP addresses 2) OS cache the DNS resolution result.
+    RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(new URI(uploadURL));
     // Generate retry policy based on the config
-    String maxNumAttemptsConfig = configs.get(MinionConstants.MAX_NUM_ATTEMPTS_KEY);
-    int maxNumAttempts =
-        maxNumAttemptsConfig != null ? Integer.parseInt(maxNumAttemptsConfig) : DEFAULT_MAX_NUM_ATTEMPTS;
+    String maxNumAttemptsConfigStr = configs.get(MinionConstants.MAX_NUM_ATTEMPTS_KEY);
+    int maxNumAttemptsFromConfig =
+        maxNumAttemptsConfigStr != null ? Integer.parseInt(maxNumAttemptsConfigStr) : DEFAULT_MAX_NUM_ATTEMPTS;
+    int maxNumAttempts = Math.max(maxNumAttemptsFromConfig, uriProvider.numAddresses());
+    LOGGER.info("Retry uploading for {} times. Max num attempts from pinot minion config: {}, number of IP addresses "
+        + "for upload URI: {}", maxNumAttempts, maxNumAttemptsFromConfig, uriProvider.numAddresses());
     String initialRetryDelayMsConfig = configs.get(MinionConstants.INITIAL_RETRY_DELAY_MS_KEY);
     long initialRetryDelayMs =
         initialRetryDelayMsConfig != null ? Long.parseLong(initialRetryDelayMsConfig) : DEFAULT_INITIAL_RETRY_DELAY_MS;
@@ -74,17 +84,28 @@ public class SegmentConversionUtils {
     SSLContext sslContext = MinionContext.getInstance().getSSLContext();
     try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
       retryPolicy.attempt(() -> {
+        URI uri = uriProvider.next();
+        String hostName = new URI(uploadURL).getHost();
+        int hostPort = new URI(uploadURL).getPort();
+        // If the original upload address is specified as host name, need add a "HOST" HTTP header to the HTTP
+        // request. Otherwise, if the upload address is a LB address, when the LB be configured as "disallow direct
+        // access by IP address", upload will fail.
+        if (!InetAddresses.isInetAddress(hostName)) {
+          httpHeaders.add(new BasicHeader(HttpHeaders.HOST, hostName + ":" + hostPort));
+        }
         try {
           SimpleHttpResponse response = fileUploadDownloadClient
-              .uploadSegment(new URI(uploadURL), segmentName, fileToUpload, httpHeaders, parameters,
+              .uploadSegment(uri, segmentName, fileToUpload, httpHeaders, parameters,
                   FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
           LOGGER.info("Got response {}: {} while uploading table: {}, segment: {} with uploadURL: {}",
               response.getStatusCode(), response.getResponse(), tableNameWithType, segmentName, uploadURL);
           return true;
         } catch (HttpErrorStatusException e) {
           int statusCode = e.getStatusCode();
-          if (statusCode == HttpStatus.SC_CONFLICT || statusCode >= 500) {
+          if (statusCode == HttpStatus.SC_CONFLICT || statusCode == HttpStatus.SC_NOT_FOUND || statusCode >= 500) {
             // Temporary exception
+            // 404 is treated as a temporary exception, as the uploadURL may be backed by multiple hosts,
+            // if singe host is down, can retry with another host.
             LOGGER.warn("Caught temporary exception while uploading segment: {}, will retry", segmentName, e);
             return false;
           } else {
diff --git a/pom.xml b/pom.xml
index 168ec2a..6a69ab9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1256,6 +1256,12 @@
         <scope>test</scope>
       </dependency>
       <dependency>
+        <groupId>org.mockito</groupId>
+        <artifactId>mockito-inline</artifactId>
+        <version>3.9.0</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-csv</artifactId>
         <version>1.0</version>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org