You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/18 06:10:16 UTC

[pulsar] branch master updated: Handle multi-hosts in PulsarAdmin for both sync and async requests. (#6547)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1517d19  Handle multi-hosts in PulsarAdmin for both sync and async requests. (#6547)
1517d19 is described below

commit 1517d1925b8448c04f27b1a1a0654ed313062e37
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Wed Mar 18 14:10:07 2020 +0800

    Handle multi-hosts in PulsarAdmin for both sync and async requests. (#6547)
    
    This PR unifies all request logic for PulsarAdmin in the async path.
    
    For PulsarAdmin request, it will retry until we get a success response, fail if we exhausted retry count, or fail if we reach the read timeout.
---
 .../admin/internal/http/AsyncHttpConnector.java    | 201 +++++++++++----------
 1 file changed, 104 insertions(+), 97 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index c90bf60..b9f3899 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -26,19 +26,22 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
-import javax.ws.rs.ProcessingException;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response.Status;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
 import lombok.Getter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -50,7 +53,6 @@ import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.util.SecurityUtility;
-import org.asynchttpclient.AsyncCompletionHandler;
 import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.BoundRequestBuilder;
 import org.asynchttpclient.DefaultAsyncHttpClient;
@@ -69,7 +71,11 @@ public class AsyncHttpConnector implements Connector {
 
     @Getter
     private final AsyncHttpClient httpClient;
+    private final int readTimeout;
+    private final int maxRetries;
     private final PulsarServiceNameResolver serviceNameResolver;
+    private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1,
+            new DefaultThreadFactory("delayer"));
 
     public AsyncHttpConnector(Client client, ClientConfigurationData conf) {
         this((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT),
@@ -117,142 +123,143 @@ public class AsyncHttpConnector implements Connector {
             }
         }
         httpClient = new DefaultAsyncHttpClient(confBuilder.build());
+        this.readTimeout = readTimeoutMs;
+        this.maxRetries = httpClient.getConfig().getMaxRequestRetry();
     }
 
+    @Override
     public ClientResponse apply(ClientRequest jerseyRequest) {
-
         CompletableFuture<ClientResponse> future = new CompletableFuture<>();
-        long startTime = System.currentTimeMillis();
-        Throwable lastException = null;
-        Set<InetSocketAddress> triedAddresses = new HashSet<>();
-
-        while (true) {
-            InetSocketAddress address = serviceNameResolver.resolveHost();
-            if (triedAddresses.contains(address)) {
-                // We already tried all available addresses
-                throw new ProcessingException((lastException.getMessage()), lastException);
+        apply(jerseyRequest, new AsyncConnectorCallback() {
+            @Override
+            public void response(ClientResponse response) {
+                future.complete(response);
             }
 
-            triedAddresses.add(address);
-            URI requestUri = replaceWithNew(address, jerseyRequest.getUri());
-            jerseyRequest.setUri(requestUri);
-            CompletableFuture<ClientResponse> tempFuture = new CompletableFuture<>();
-            try {
-                resolveRequest(tempFuture, jerseyRequest);
-                if (System.currentTimeMillis() - startTime > httpClient.getConfig().getRequestTimeout()) {
-                    throw new ProcessingException(
-                        "Request timeout, the last try service url is : " + jerseyRequest.getUri().toString());
-                }
-            } catch (ExecutionException ex) {
-                Throwable e = ex.getCause() == null ? ex : ex.getCause();
-                if (System.currentTimeMillis() - startTime > httpClient.getConfig().getRequestTimeout()) {
-                    throw new ProcessingException((e.getMessage()), e);
-                }
-                lastException = e;
-                continue;
-            } catch (Exception e) {
-                if (System.currentTimeMillis() - startTime > httpClient.getConfig().getRequestTimeout()) {
-                    throw new ProcessingException(e.getMessage(), e);
-                }
-                lastException = e;
-                continue;
+            @Override
+            public void failure(Throwable failure) {
+                future.completeExceptionally(failure);
             }
-            future = tempFuture;
-            break;
+        });
+        try {
+            return future.get();
+        } catch (InterruptedException | ExecutionException e) {
+            log.error(e.getMessage());
         }
-
-        return future.join();
+        return null;
     }
 
     private URI replaceWithNew(InetSocketAddress address, URI uri) {
         String originalUri = uri.toString();
         String newUri = (originalUri.split(":")[0] + "://")
-                        + address.getHostName() + ":"
-                        + address.getPort()
-                        + uri.getRawPath();
+                + address.getHostName() + ":"
+                + address.getPort()
+                + uri.getRawPath();
         if (uri.getRawQuery() != null) {
             newUri += "?" + uri.getRawQuery();
         }
         return URI.create(newUri);
     }
 
-
-
-    private void resolveRequest(CompletableFuture<ClientResponse> future,
-                                ClientRequest jerseyRequest)
-        throws InterruptedException, ExecutionException, TimeoutException {
-        Future<?> resultFuture = apply(jerseyRequest, new AsyncConnectorCallback() {
-            @Override
-            public void response(ClientResponse response) {
-                future.complete(response);
-            }
-            @Override
-            public void failure(Throwable failure) {
-                future.completeExceptionally(failure);
+    @Override
+    public Future<?> apply(ClientRequest jerseyRequest, AsyncConnectorCallback callback) {
+        CompletableFuture<Response> responseFuture = retryOrTimeOut(jerseyRequest);
+        responseFuture.whenComplete(((response, throwable) -> {
+            if (throwable != null) {
+                callback.failure(throwable);
+            } else {
+                ClientResponse jerseyResponse = new ClientResponse(Status.fromStatusCode(response.getStatusCode()), jerseyRequest);
+                response.getHeaders().forEach(e -> jerseyResponse.header(e.getKey(), e.getValue()));
+                if (response.hasResponseBody()) {
+                    jerseyResponse.setEntityStream(response.getResponseBodyAsStream());
+                }
+                callback.response(jerseyResponse);
             }
-        });
+        }));
+        return responseFuture;
+    }
 
-        Integer timeout = httpClient.getConfig().getRequestTimeout() / 3;
+    private CompletableFuture<Response> retryOrTimeOut(ClientRequest request) {
+        final CompletableFuture<Response> resultFuture = new CompletableFuture<>();
+        retryOperation(resultFuture, () -> oneShot(serviceNameResolver.resolveHost(), request), maxRetries);
+        CompletableFuture<Response> timeoutAfter = timeoutAfter(readTimeout, TimeUnit.MILLISECONDS);
+        return resultFuture.applyToEither(timeoutAfter, Function.identity());
+    }
 
-        Object result = null;
-        if (timeout != null && timeout > 0) {
-            result = resultFuture.get(timeout, TimeUnit.MILLISECONDS);
-        } else {
-            result = resultFuture.get();
+    private <T> void retryOperation(
+            final CompletableFuture<T> resultFuture,
+            final Supplier<CompletableFuture<T>> operation,
+            final int retries) {
+
+        if (!resultFuture.isDone()) {
+            final CompletableFuture<T> operationFuture = operation.get();
+
+            operationFuture.whenComplete(
+                    (t, throwable) -> {
+                        if (throwable != null) {
+                            if (throwable instanceof CancellationException) {
+                                resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable));
+                            } else {
+                                if (retries > 0) {
+                                    retryOperation(
+                                            resultFuture,
+                                            operation,
+                                            retries - 1);
+                                } else {
+                                    resultFuture.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
+                                            "has been exhausted.", throwable));
+                                }
+                            }
+                        } else {
+                            resultFuture.complete(t);
+                        }
+                    });
+
+            resultFuture.whenComplete(
+                    (t, throwable) -> operationFuture.cancel(false));
         }
+    }
 
-        if (result != null && result instanceof Throwable) {
-            throw new ExecutionException((Throwable) result);
+    public static class RetryException extends Exception {
+        public RetryException(String message, Throwable cause) {
+            super(message, cause);
         }
     }
 
-    @Override
-    public Future<?> apply(ClientRequest jerseyRequest, AsyncConnectorCallback callback) {
-        final CompletableFuture<Object> future = new CompletableFuture<>();
+    private CompletableFuture<Response> oneShot(InetSocketAddress host, ClientRequest request) {
+        ClientRequest currentRequest = new ClientRequest(request);
+        URI newUri = replaceWithNew(host, currentRequest.getUri());
+        currentRequest.setUri(newUri);
 
-        BoundRequestBuilder builder = httpClient.prepare(jerseyRequest.getMethod(), jerseyRequest.getUri().toString());
+        BoundRequestBuilder builder = httpClient.prepare(currentRequest.getMethod(), currentRequest.getUri().toString());
 
-        if (jerseyRequest.hasEntity()) {
+        if (currentRequest.hasEntity()) {
             ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-            jerseyRequest.setStreamProvider(contentLength -> outStream);
+            currentRequest.setStreamProvider(contentLength -> outStream);
             try {
-                jerseyRequest.writeEntity();
+                currentRequest.writeEntity();
             } catch (IOException e) {
-                future.completeExceptionally(e);
-                return future;
+                CompletableFuture<Response> r = new CompletableFuture<>();
+                r.completeExceptionally(e);
+                return r;
             }
 
             builder.setBody(outStream.toByteArray());
         }
 
-        jerseyRequest.getHeaders().forEach((key, headers) -> {
+        currentRequest.getHeaders().forEach((key, headers) -> {
             if (!HttpHeaders.USER_AGENT.equals(key)) {
                 builder.addHeader(key, headers);
             }
         });
 
-        builder.execute(new AsyncCompletionHandler<Response>() {
-            @Override
-            public Response onCompleted(Response response) throws Exception {
-                ClientResponse jerseyResponse = new ClientResponse(Status.fromStatusCode(response.getStatusCode()),
-                        jerseyRequest);
-                response.getHeaders().forEach(e -> jerseyResponse.header(e.getKey(), e.getValue()));
-                if (response.hasResponseBody()) {
-                    jerseyResponse.setEntityStream(response.getResponseBodyAsStream());
-                }
-                callback.response(jerseyResponse);
-                future.complete(jerseyResponse);
-                return response;
-            }
-
-            @Override
-            public void onThrowable(Throwable t) {
-                callback.failure(t);
-                future.completeExceptionally(t);
-            }
-        });
+        return builder.execute().toCompletableFuture();
+    }
 
-        return future;
+    public <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
+        CompletableFuture<T> result = new CompletableFuture<>();
+        delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
+        return result;
     }
 
     @Override