You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/18 06:10:16 UTC
[pulsar] branch master updated: Handle multi-hosts in PulsarAdmin
for both sync and async requests. (#6547)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1517d19 Handle multi-hosts in PulsarAdmin for both sync and async requests. (#6547)
1517d19 is described below
commit 1517d1925b8448c04f27b1a1a0654ed313062e37
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Wed Mar 18 14:10:07 2020 +0800
Handle multi-hosts in PulsarAdmin for both sync and async requests. (#6547)
This PR unifies all request logic for PulsarAdmin in the async path.
For PulsarAdmin request, it will retry until we get a success response, fail if we exhausted retry count, or fail if we reach the read timeout.
---
.../admin/internal/http/AsyncHttpConnector.java | 201 +++++++++++----------
1 file changed, 104 insertions(+), 97 deletions(-)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index c90bf60..b9f3899 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -26,19 +26,22 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.function.Supplier;
-import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response.Status;
+import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -50,7 +53,6 @@ import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.SecurityUtility;
-import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClient;
@@ -69,7 +71,11 @@ public class AsyncHttpConnector implements Connector {
@Getter
private final AsyncHttpClient httpClient;
+ private final int readTimeout;
+ private final int maxRetries;
private final PulsarServiceNameResolver serviceNameResolver;
+ private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1,
+ new DefaultThreadFactory("delayer"));
public AsyncHttpConnector(Client client, ClientConfigurationData conf) {
this((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT),
@@ -117,142 +123,143 @@ public class AsyncHttpConnector implements Connector {
}
}
httpClient = new DefaultAsyncHttpClient(confBuilder.build());
+ this.readTimeout = readTimeoutMs;
+ this.maxRetries = httpClient.getConfig().getMaxRequestRetry();
}
+ @Override
public ClientResponse apply(ClientRequest jerseyRequest) {
-
CompletableFuture<ClientResponse> future = new CompletableFuture<>();
- long startTime = System.currentTimeMillis();
- Throwable lastException = null;
- Set<InetSocketAddress> triedAddresses = new HashSet<>();
-
- while (true) {
- InetSocketAddress address = serviceNameResolver.resolveHost();
- if (triedAddresses.contains(address)) {
- // We already tried all available addresses
- throw new ProcessingException((lastException.getMessage()), lastException);
+ apply(jerseyRequest, new AsyncConnectorCallback() {
+ @Override
+ public void response(ClientResponse response) {
+ future.complete(response);
}
- triedAddresses.add(address);
- URI requestUri = replaceWithNew(address, jerseyRequest.getUri());
- jerseyRequest.setUri(requestUri);
- CompletableFuture<ClientResponse> tempFuture = new CompletableFuture<>();
- try {
- resolveRequest(tempFuture, jerseyRequest);
- if (System.currentTimeMillis() - startTime > httpClient.getConfig().getRequestTimeout()) {
- throw new ProcessingException(
- "Request timeout, the last try service url is : " + jerseyRequest.getUri().toString());
- }
- } catch (ExecutionException ex) {
- Throwable e = ex.getCause() == null ? ex : ex.getCause();
- if (System.currentTimeMillis() - startTime > httpClient.getConfig().getRequestTimeout()) {
- throw new ProcessingException((e.getMessage()), e);
- }
- lastException = e;
- continue;
- } catch (Exception e) {
- if (System.currentTimeMillis() - startTime > httpClient.getConfig().getRequestTimeout()) {
- throw new ProcessingException(e.getMessage(), e);
- }
- lastException = e;
- continue;
+ @Override
+ public void failure(Throwable failure) {
+ future.completeExceptionally(failure);
}
- future = tempFuture;
- break;
+ });
+ try {
+ return future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ log.error(e.getMessage());
}
-
- return future.join();
+ return null;
}
private URI replaceWithNew(InetSocketAddress address, URI uri) {
String originalUri = uri.toString();
String newUri = (originalUri.split(":")[0] + "://")
- + address.getHostName() + ":"
- + address.getPort()
- + uri.getRawPath();
+ + address.getHostName() + ":"
+ + address.getPort()
+ + uri.getRawPath();
if (uri.getRawQuery() != null) {
newUri += "?" + uri.getRawQuery();
}
return URI.create(newUri);
}
-
-
- private void resolveRequest(CompletableFuture<ClientResponse> future,
- ClientRequest jerseyRequest)
- throws InterruptedException, ExecutionException, TimeoutException {
- Future<?> resultFuture = apply(jerseyRequest, new AsyncConnectorCallback() {
- @Override
- public void response(ClientResponse response) {
- future.complete(response);
- }
- @Override
- public void failure(Throwable failure) {
- future.completeExceptionally(failure);
+ @Override
+ public Future<?> apply(ClientRequest jerseyRequest, AsyncConnectorCallback callback) {
+ CompletableFuture<Response> responseFuture = retryOrTimeOut(jerseyRequest);
+ responseFuture.whenComplete(((response, throwable) -> {
+ if (throwable != null) {
+ callback.failure(throwable);
+ } else {
+ ClientResponse jerseyResponse = new ClientResponse(Status.fromStatusCode(response.getStatusCode()), jerseyRequest);
+ response.getHeaders().forEach(e -> jerseyResponse.header(e.getKey(), e.getValue()));
+ if (response.hasResponseBody()) {
+ jerseyResponse.setEntityStream(response.getResponseBodyAsStream());
+ }
+ callback.response(jerseyResponse);
}
- });
+ }));
+ return responseFuture;
+ }
- Integer timeout = httpClient.getConfig().getRequestTimeout() / 3;
+ private CompletableFuture<Response> retryOrTimeOut(ClientRequest request) {
+ final CompletableFuture<Response> resultFuture = new CompletableFuture<>();
+ retryOperation(resultFuture, () -> oneShot(serviceNameResolver.resolveHost(), request), maxRetries);
+ CompletableFuture<Response> timeoutAfter = timeoutAfter(readTimeout, TimeUnit.MILLISECONDS);
+ return resultFuture.applyToEither(timeoutAfter, Function.identity());
+ }
- Object result = null;
- if (timeout != null && timeout > 0) {
- result = resultFuture.get(timeout, TimeUnit.MILLISECONDS);
- } else {
- result = resultFuture.get();
+ private <T> void retryOperation(
+ final CompletableFuture<T> resultFuture,
+ final Supplier<CompletableFuture<T>> operation,
+ final int retries) {
+
+ if (!resultFuture.isDone()) {
+ final CompletableFuture<T> operationFuture = operation.get();
+
+ operationFuture.whenComplete(
+ (t, throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof CancellationException) {
+ resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable));
+ } else {
+ if (retries > 0) {
+ retryOperation(
+ resultFuture,
+ operation,
+ retries - 1);
+ } else {
+ resultFuture.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
+ "has been exhausted.", throwable));
+ }
+ }
+ } else {
+ resultFuture.complete(t);
+ }
+ });
+
+ resultFuture.whenComplete(
+ (t, throwable) -> operationFuture.cancel(false));
}
+ }
- if (result != null && result instanceof Throwable) {
- throw new ExecutionException((Throwable) result);
+ public static class RetryException extends Exception {
+ public RetryException(String message, Throwable cause) {
+ super(message, cause);
}
}
- @Override
- public Future<?> apply(ClientRequest jerseyRequest, AsyncConnectorCallback callback) {
- final CompletableFuture<Object> future = new CompletableFuture<>();
+ private CompletableFuture<Response> oneShot(InetSocketAddress host, ClientRequest request) {
+ ClientRequest currentRequest = new ClientRequest(request);
+ URI newUri = replaceWithNew(host, currentRequest.getUri());
+ currentRequest.setUri(newUri);
- BoundRequestBuilder builder = httpClient.prepare(jerseyRequest.getMethod(), jerseyRequest.getUri().toString());
+ BoundRequestBuilder builder = httpClient.prepare(currentRequest.getMethod(), currentRequest.getUri().toString());
- if (jerseyRequest.hasEntity()) {
+ if (currentRequest.hasEntity()) {
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
- jerseyRequest.setStreamProvider(contentLength -> outStream);
+ currentRequest.setStreamProvider(contentLength -> outStream);
try {
- jerseyRequest.writeEntity();
+ currentRequest.writeEntity();
} catch (IOException e) {
- future.completeExceptionally(e);
- return future;
+ CompletableFuture<Response> r = new CompletableFuture<>();
+ r.completeExceptionally(e);
+ return r;
}
builder.setBody(outStream.toByteArray());
}
- jerseyRequest.getHeaders().forEach((key, headers) -> {
+ currentRequest.getHeaders().forEach((key, headers) -> {
if (!HttpHeaders.USER_AGENT.equals(key)) {
builder.addHeader(key, headers);
}
});
- builder.execute(new AsyncCompletionHandler<Response>() {
- @Override
- public Response onCompleted(Response response) throws Exception {
- ClientResponse jerseyResponse = new ClientResponse(Status.fromStatusCode(response.getStatusCode()),
- jerseyRequest);
- response.getHeaders().forEach(e -> jerseyResponse.header(e.getKey(), e.getValue()));
- if (response.hasResponseBody()) {
- jerseyResponse.setEntityStream(response.getResponseBodyAsStream());
- }
- callback.response(jerseyResponse);
- future.complete(jerseyResponse);
- return response;
- }
-
- @Override
- public void onThrowable(Throwable t) {
- callback.failure(t);
- future.completeExceptionally(t);
- }
- });
+ return builder.execute().toCompletableFuture();
+ }
- return future;
+ public <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
+ CompletableFuture<T> result = new CompletableFuture<>();
+ delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
+ return result;
}
@Override