You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/15 04:02:50 UTC
[pulsar] branch master updated: With multi-host HTTP client,
stop after trying all the addresses (#4272)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6b2eaa8 With multi-host HTTP client, stop after trying all the addresses (#4272)
6b2eaa8 is described below
commit 6b2eaa8ac81e1e4113f1b23b89b3cb1a09d1de68
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue May 14 21:02:44 2019 -0700
With multi-host HTTP client, stop after trying all the addresses (#4272)
* With multi-host HTTP client, stop after trying all the addresses
* Also fixed the name resolver to not use random each time
---
.../client/admin/internal/http/AsyncHttpConnector.java | 15 ++++++++++++++-
.../pulsar/client/impl/PulsarServiceNameResolver.java | 6 +++++-
2 files changed, 19 insertions(+), 2 deletions(-)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 279ba35..c90bf60 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -26,6 +26,8 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -121,8 +123,17 @@ public class AsyncHttpConnector implements Connector {
CompletableFuture<ClientResponse> future = new CompletableFuture<>();
long startTime = System.currentTimeMillis();
+ Throwable lastException = null;
+ Set<InetSocketAddress> triedAddresses = new HashSet<>();
+
while (true) {
InetSocketAddress address = serviceNameResolver.resolveHost();
+ if (triedAddresses.contains(address)) {
+ // We already tried all available addresses
+ throw new ProcessingException((lastException.getMessage()), lastException);
+ }
+
+ triedAddresses.add(address);
URI requestUri = replaceWithNew(address, jerseyRequest.getUri());
jerseyRequest.setUri(requestUri);
CompletableFuture<ClientResponse> tempFuture = new CompletableFuture<>();
@@ -133,15 +144,17 @@ public class AsyncHttpConnector implements Connector {
"Request timeout, the last try service url is : " + jerseyRequest.getUri().toString());
}
} catch (ExecutionException ex) {
+ Throwable e = ex.getCause() == null ? ex : ex.getCause();
if (System.currentTimeMillis() - startTime > httpClient.getConfig().getRequestTimeout()) {
- Throwable e = ex.getCause() == null ? ex : ex.getCause();
throw new ProcessingException((e.getMessage()), e);
}
+ lastException = e;
continue;
} catch (Exception e) {
if (System.currentTimeMillis() - startTime > httpClient.getConfig().getRequestTimeout()) {
throw new ProcessingException(e.getMessage(), e);
}
+ lastException = e;
continue;
}
future = tempFuture;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
index c9cc2ff..9d15ccf 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -38,6 +38,7 @@ public class PulsarServiceNameResolver implements ServiceNameResolver {
private volatile ServiceURI serviceUri;
private volatile String serviceUrl;
+ private volatile int currentIndex;
private volatile List<InetSocketAddress> addressList;
@Override
@@ -50,7 +51,9 @@ public class PulsarServiceNameResolver implements ServiceNameResolver {
if (list.size() == 1) {
return list.get(0);
} else {
- return list.get(randomIndex(list.size()));
+ currentIndex = (currentIndex + 1) % list.size();
+ return list.get(currentIndex);
+
}
}
@@ -96,6 +99,7 @@ public class PulsarServiceNameResolver implements ServiceNameResolver {
this.addressList = addresses;
this.serviceUrl = serviceUrl;
this.serviceUri = uri;
+ this.currentIndex = randomIndex(addresses.size());
}
private static int randomIndex(int numAddresses) {