You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/15 04:02:50 UTC

[pulsar] branch master updated: With multi-host HTTP client, stop after trying all the addresses (#4272)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b2eaa8  With multi-host HTTP client, stop after trying all the addresses (#4272)
6b2eaa8 is described below

commit 6b2eaa8ac81e1e4113f1b23b89b3cb1a09d1de68
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue May 14 21:02:44 2019 -0700

    With multi-host HTTP client, stop after trying all the addresses (#4272)
    
    * With multi-host HTTP client, stop after trying all the addresses
    
    * Also fixed the name resolver to not use random each time
---
 .../client/admin/internal/http/AsyncHttpConnector.java    | 15 ++++++++++++++-
 .../pulsar/client/impl/PulsarServiceNameResolver.java     |  6 +++++-
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 279ba35..c90bf60 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -26,6 +26,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -121,8 +123,17 @@ public class AsyncHttpConnector implements Connector {
 
         CompletableFuture<ClientResponse> future = new CompletableFuture<>();
         long startTime = System.currentTimeMillis();
+        Throwable lastException = null;
+        Set<InetSocketAddress> triedAddresses = new HashSet<>();
+
         while (true) {
             InetSocketAddress address = serviceNameResolver.resolveHost();
+            if (triedAddresses.contains(address)) {
+                // We already tried all available addresses
+                throw new ProcessingException((lastException.getMessage()), lastException);
+            }
+
+            triedAddresses.add(address);
             URI requestUri = replaceWithNew(address, jerseyRequest.getUri());
             jerseyRequest.setUri(requestUri);
             CompletableFuture<ClientResponse> tempFuture = new CompletableFuture<>();
@@ -133,15 +144,17 @@ public class AsyncHttpConnector implements Connector {
                         "Request timeout, the last try service url is : " + jerseyRequest.getUri().toString());
                 }
             } catch (ExecutionException ex) {
+                Throwable e = ex.getCause() == null ? ex : ex.getCause();
                 if (System.currentTimeMillis() - startTime > httpClient.getConfig().getRequestTimeout()) {
-                    Throwable e = ex.getCause() == null ? ex : ex.getCause();
                     throw new ProcessingException((e.getMessage()), e);
                 }
+                lastException = e;
                 continue;
             } catch (Exception e) {
                 if (System.currentTimeMillis() - startTime > httpClient.getConfig().getRequestTimeout()) {
                     throw new ProcessingException(e.getMessage(), e);
                 }
+                lastException = e;
                 continue;
             }
             future = tempFuture;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
index c9cc2ff..9d15ccf 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -38,6 +38,7 @@ public class PulsarServiceNameResolver implements ServiceNameResolver {
 
     private volatile ServiceURI serviceUri;
     private volatile String serviceUrl;
+    private volatile int currentIndex;
     private volatile List<InetSocketAddress> addressList;
 
     @Override
@@ -50,7 +51,9 @@ public class PulsarServiceNameResolver implements ServiceNameResolver {
         if (list.size() == 1) {
             return list.get(0);
         } else {
-            return list.get(randomIndex(list.size()));
+            currentIndex = (currentIndex + 1) % list.size();
+            return list.get(currentIndex);
+
         }
     }
 
@@ -96,6 +99,7 @@ public class PulsarServiceNameResolver implements ServiceNameResolver {
         this.addressList = addresses;
         this.serviceUrl = serviceUrl;
         this.serviceUri = uri;
+        this.currentIndex = randomIndex(addresses.size());
     }
 
     private static int randomIndex(int numAddresses) {