You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/04 18:14:24 UTC

[pulsar] branch branch-2.3 updated: Fixed issue with Authorization header missing after client gets redirected (#3869)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new a15b6f0  Fixed issue with Authorization header missing after client gets redirected (#3869)
a15b6f0 is described below

commit a15b6f073e6a1752c5f1169c072756b7efa7f762
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Apr 4 13:11:45 2019 -0500

    Fixed issue with Authorization header missing after client gets redirected (#3869)
    
    * Fixed issue with Authorization header missing after client gets redirected
    
    * Use AsyncHttpClient instead
    
    * Fixed merge issue
    
    * Convert headers after body to not break multipart encoding
    
    * Fixed tests
    
    * Added missing property file for AsyncHttpClient in the shaded jar
    
    * Moved ahc.properties into org.asynchttpclient.config
    
    * One value was missing from properties file
    
    * Also add default file for non-shaded scenario
    
    * Another missing AHC config key
    
    * Print topic load exception
    
    * Removed constraint on unit tests executor
---
 .../pulsar/broker/service/BrokerService.java       |   4 +-
 .../broker/auth/SameThreadOrderedSafeExecutor.java |   2 +-
 pulsar-client-admin-shaded/pom.xml                 |   6 +
 .../apache/pulsar/client/admin/PulsarAdmin.java    |  68 ++-----
 .../admin/internal/http/AsyncHttpConnector.java    | 201 +++++++++++++++++++++
 .../internal/http/AsyncHttpConnectorProvider.java  |  40 ++++
 .../asynchttpclient/config/ahc-default.properties  |  73 ++++++++
 .../org/asynchttpclient/config/ahc.properties      |  73 ++++++++
 .../token/PulsarTokenAuthenticationBaseSuite.java  |  48 ++++-
 9 files changed, 456 insertions(+), 59 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 99a8930..58ddcdc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -300,7 +300,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             }
             log.info("Started Pulsar Broker service on port {}", port.get());
         }
-        
+
         Optional<Integer> tlsPort = serviceConfig.getBrokerServicePortTls();
         if (tlsPort.isPresent()) {
             ServerBootstrap tlsBootstrap = bootstrap.clone();
@@ -491,7 +491,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             if (cause instanceof ServiceUnitNotReadyException) {
                 log.warn("[{}] Service unit is not ready when loading the topic", topic);
             } else {
-                log.warn("[{}] Unexpected exception when loading topic: {}", topic, cause);
+                log.warn("[{}] Unexpected exception when loading topic: {}", topic, e.getMessage(), e);
             }
 
             return failedFuture(cause);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
index 3f74804..0df2b23 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
@@ -35,7 +35,7 @@ public class SameThreadOrderedSafeExecutor extends OrderedExecutor {
             false,
             false,
             100000,
-            10,
+            -1,
             false);
     }
 
diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml
index c641c2b..4b36736 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -104,6 +104,12 @@
                     <include>**</include>
                   </includes>
                 </filter>
+                <filter>
+                  <artifact>org.apache.pulsar:pulsar-client-admin-original</artifact>
+                  <includes>
+                    <include>**</include>
+                  </includes>
+                </filter>
               </filters>
               <relocations>
                <relocation>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index deb9722..4731879 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -18,9 +18,16 @@
  */
 package org.apache.pulsar.client.admin;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.conn.ssl.DefaultHostnameVerifier;
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+
 import org.apache.pulsar.client.admin.internal.BookiesImpl;
 import org.apache.pulsar.client.admin.internal.BrokerStatsImpl;
 import org.apache.pulsar.client.admin.internal.BrokersImpl;
@@ -38,13 +45,12 @@ import org.apache.pulsar.client.admin.internal.SourceImpl;
 import org.apache.pulsar.client.admin.internal.TenantsImpl;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.client.admin.internal.WorkerImpl;
+import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider;
 import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.common.util.SecurityUtility;
 import org.glassfish.jersey.client.ClientConfig;
 import org.glassfish.jersey.client.ClientProperties;
 import org.glassfish.jersey.jackson.JacksonFeature;
@@ -53,17 +59,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.bridge.SLF4JBridgeHandler;
 
-import javax.net.ssl.SSLContext;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.WebTarget;
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.URL;
-import java.security.cert.X509Certificate;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Pulsar client admin API client.
  */
@@ -155,6 +150,7 @@ public class PulsarAdmin implements Closeable {
         httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
         httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
         httpConfig.register(MultiPartFeature.class);
+        httpConfig.connectorProvider(new AsyncHttpConnectorProvider(clientConfigData));
 
         ClientBuilder clientBuilder = ClientBuilder.newBuilder()
                 .withConfig(httpConfig)
@@ -162,45 +158,7 @@ public class PulsarAdmin implements Closeable {
                 .readTimeout(this.readTimeout, this.readTimeoutUnit)
                 .register(JacksonConfigurator.class).register(JacksonFeature.class);
 
-        boolean useTls = false;
-
-        if (clientConfigData != null && StringUtils.isNotBlank(clientConfigData.getServiceUrl())
-                && clientConfigData.getServiceUrl().startsWith("https://")) {
-            useTls = true;
-            try {
-                SSLContext sslCtx = null;
-
-                X509Certificate trustCertificates[] = SecurityUtility
-                        .loadCertificatesFromPemFile(clientConfigData.getTlsTrustCertsFilePath());
-
-                // Set private key and certificate if available
-                AuthenticationDataProvider authData = auth.getAuthData();
-                if (authData.hasDataForTls()) {
-                    sslCtx = SecurityUtility.createSslContext(clientConfigData.isTlsAllowInsecureConnection(),
-                            trustCertificates, authData.getTlsCertificates(), authData.getTlsPrivateKey());
-                } else {
-                    sslCtx = SecurityUtility.createSslContext(clientConfigData.isTlsAllowInsecureConnection(),
-                            trustCertificates);
-                }
-
-                clientBuilder.sslContext(sslCtx);
-                if (clientConfigData.isTlsHostnameVerificationEnable()) {
-                    clientBuilder.hostnameVerifier(new DefaultHostnameVerifier());
-                } else {
-                    // Disable hostname verification
-                    clientBuilder.hostnameVerifier(NoopHostnameVerifier.INSTANCE);
-                }
-            } catch (Exception e) {
-                try {
-                    if (auth != null) {
-                        auth.close();
-                    }
-                } catch (IOException ioe) {
-                    LOG.error("Failed to close the authentication service", ioe);
-                }
-                throw new PulsarClientException.InvalidConfigurationException(e.getMessage());
-            }
-        }
+        boolean useTls = clientConfigData.getServiceUrl().startsWith("https://");
 
         this.client = clientBuilder.build();
 
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
new file mode 100644
index 0000000..fd89108
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal.http;
+
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.ssl.SslContext;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response.Status;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.SecurityUtility;
+import org.asynchttpclient.AsyncCompletionHandler;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
+import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.ClientRequest;
+import org.glassfish.jersey.client.ClientResponse;
+import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
+import org.glassfish.jersey.client.spi.Connector;
+
+@Slf4j
+public class AsyncHttpConnector implements Connector {
+
+    private final AsyncHttpClient httpClient;
+
+    @SneakyThrows
+    public AsyncHttpConnector(Client client, ClientConfigurationData conf) {
+        DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
+        confBuilder.setFollowRedirect(true);
+        confBuilder.setConnectTimeout((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT));
+        confBuilder.setReadTimeout((int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT));
+        confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
+        confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() {
+            @Override
+            public boolean keepAlive(Request ahcRequest, HttpRequest request, HttpResponse response) {
+                // Close connection upon a server error or per HTTP spec
+                return (response.status().code() / 100 != 5) && super.keepAlive(ahcRequest, request, response);
+            }
+        });
+
+        if (conf != null && StringUtils.isNotBlank(conf.getServiceUrl())
+                && conf.getServiceUrl().startsWith("https://")) {
+
+            SslContext sslCtx = null;
+
+            // Set client key and certificate if available
+            AuthenticationDataProvider authData = conf.getAuthentication().getAuthData();
+            if (authData.hasDataForTls()) {
+                sslCtx = SecurityUtility.createNettySslContextForClient(
+                        conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
+                        conf.getTlsTrustCertsFilePath(),
+                        authData.getTlsCertificates(), authData.getTlsPrivateKey());
+            } else {
+                sslCtx = SecurityUtility.createNettySslContextForClient(
+                        conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
+                        conf.getTlsTrustCertsFilePath());
+            }
+
+            confBuilder.setSslContext(sslCtx);
+        }
+        httpClient = new DefaultAsyncHttpClient(confBuilder.build());
+    }
+
+    @Override
+    public ClientResponse apply(ClientRequest jerseyRequest) {
+        CompletableFuture<ClientResponse> future = new CompletableFuture<>();
+
+        try {
+            Future<?> resultFuture = apply(jerseyRequest, new AsyncConnectorCallback() {
+                @Override
+                public void response(ClientResponse response) {
+                    future.complete(response);
+                }
+
+                @Override
+                public void failure(Throwable failure) {
+                    future.completeExceptionally(failure);
+                }
+            });
+
+            Integer timeout = ClientProperties.getValue(
+                    jerseyRequest.getConfiguration().getProperties(),
+                    ClientProperties.READ_TIMEOUT, 0);
+
+            if (timeout != null && timeout > 0) {
+                resultFuture.get(timeout, TimeUnit.MILLISECONDS);
+            } else {
+                resultFuture.get();
+            }
+        } catch (ExecutionException ex) {
+            Throwable e = ex.getCause() == null ? ex : ex.getCause();
+            throw new ProcessingException(e.getMessage(), e);
+        } catch (Exception ex) {
+            throw new ProcessingException(ex.getMessage(), ex);
+        }
+
+        return future.join();
+    }
+
+    @Override
+    public Future<?> apply(ClientRequest jerseyRequest, AsyncConnectorCallback callback) {
+        final CompletableFuture<Object> future = new CompletableFuture<>();
+
+        BoundRequestBuilder builder = httpClient.prepare(jerseyRequest.getMethod(), jerseyRequest.getUri().toString());
+
+        if (jerseyRequest.hasEntity()) {
+            ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+            jerseyRequest.setStreamProvider(contentLength -> outStream);
+            try {
+                jerseyRequest.writeEntity();
+            } catch (IOException e) {
+                future.completeExceptionally(e);
+                return future;
+            }
+
+            builder.setBody(outStream.toByteArray());
+        }
+
+        jerseyRequest.getHeaders().forEach((key, headers) -> {
+            if (!HttpHeaders.USER_AGENT.equals(key)) {
+                builder.addHeader(key, headers);
+            }
+        });
+
+        builder.execute(new AsyncCompletionHandler<Response>() {
+            @Override
+            public Response onCompleted(Response response) throws Exception {
+                ClientResponse jerseyResponse = new ClientResponse(Status.fromStatusCode(response.getStatusCode()),
+                        jerseyRequest);
+                response.getHeaders().forEach(e -> jerseyResponse.header(e.getKey(), e.getValue()));
+                if (response.hasResponseBody()) {
+                    jerseyResponse.setEntityStream(response.getResponseBodyAsStream());
+                }
+                callback.response(jerseyResponse);
+                future.complete(jerseyResponse);
+                return response;
+            }
+
+            @Override
+            public void onThrowable(Throwable t) {
+                callback.failure(t);
+                future.completeExceptionally(t);
+            }
+        });
+
+        return future;
+    }
+
+    @Override
+    public String getName() {
+        return "Pulsar-Admin";
+    }
+
+    @Override
+    public void close() {
+        try {
+            httpClient.close();
+        } catch (IOException e) {
+            log.warn("Failed to close http client", e);
+        }
+    }
+
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
new file mode 100644
index 0000000..2f24089
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal.http;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.core.Configuration;
+
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.glassfish.jersey.client.spi.Connector;
+import org.glassfish.jersey.client.spi.ConnectorProvider;
+
+public class AsyncHttpConnectorProvider implements ConnectorProvider {
+
+    private final ClientConfigurationData conf;
+
+    public AsyncHttpConnectorProvider(ClientConfigurationData conf) {
+        this.conf = conf;
+    }
+
+    @Override
+    public Connector getConnector(Client client, Configuration runtimeConfig) {
+        return new AsyncHttpConnector(client, conf);
+    }
+}
diff --git a/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc-default.properties b/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc-default.properties
new file mode 100644
index 0000000..fa74eb6
--- /dev/null
+++ b/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc-default.properties
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This file is used by the shaded asynchttpclient packaged in pulsar-client-${VERSION}-shaded.jar.
+# For more details please refer https://github.com/apache/pulsar/issues/389
+
+org.asynchttpclient.threadPoolName=AsyncHttpClient
+org.asynchttpclient.maxConnections=-1
+org.asynchttpclient.maxConnectionsPerHost=-1
+org.asynchttpclient.connectTimeout=5000
+org.asynchttpclient.pooledConnectionIdleTimeout=60000
+org.asynchttpclient.connectionPoolCleanerPeriod=1000
+org.asynchttpclient.readTimeout=60000
+org.asynchttpclient.requestTimeout=60000
+org.asynchttpclient.connectionTtl=-1
+org.asynchttpclient.followRedirect=false
+org.asynchttpclient.maxRedirects=5
+org.asynchttpclient.compressionEnforced=false
+org.asynchttpclient.userAgent=AHC/2.1
+org.asynchttpclient.enabledProtocols=TLSv1.2, TLSv1.1, TLSv1
+org.asynchttpclient.enabledCipherSuites=
+org.asynchttpclient.useProxySelector=false
+org.asynchttpclient.useProxyProperties=false
+org.asynchttpclient.validateResponseHeaders=true
+org.asynchttpclient.aggregateWebSocketFrameFragments=true
+org.asynchttpclient.strict302Handling=false
+org.asynchttpclient.keepAlive=true
+org.asynchttpclient.maxRequestRetry=5
+org.asynchttpclient.disableUrlEncodingForBoundRequests=false
+org.asynchttpclient.useLaxCookieEncoder=false
+org.asynchttpclient.removeQueryParamOnRedirect=true
+org.asynchttpclient.useOpenSsl=false
+org.asynchttpclient.useInsecureTrustManager=false
+org.asynchttpclient.disableHttpsEndpointIdentificationAlgorithm=false
+org.asynchttpclient.sslSessionCacheSize=0
+org.asynchttpclient.sslSessionTimeout=0
+org.asynchttpclient.tcpNoDelay=true
+org.asynchttpclient.soReuseAddress=false
+org.asynchttpclient.soLinger=-1
+org.asynchttpclient.soSndBuf=-1
+org.asynchttpclient.soRcvBuf=-1
+org.asynchttpclient.httpClientCodecMaxInitialLineLength=4096
+org.asynchttpclient.httpClientCodecMaxHeaderSize=8192
+org.asynchttpclient.httpClientCodecMaxChunkSize=8192
+org.asynchttpclient.httpClientCodecInitialBufferSize=128
+org.asynchttpclient.disableZeroCopy=false
+org.asynchttpclient.handshakeTimeout=10000
+org.asynchttpclient.chunkedFileChunkSize=8192
+org.asynchttpclient.webSocketMaxBufferSize=128000000
+org.asynchttpclient.webSocketMaxFrameSize=10240
+org.asynchttpclient.keepEncodingHeader=false
+org.asynchttpclient.shutdownQuietPeriod=2000
+org.asynchttpclient.shutdownTimeout=15000
+org.asynchttpclient.useNativeTransport=false
+org.asynchttpclient.ioThreadsCount=0
+org.asynchttpclient.acquireFreeChannelTimeout=0
+org.asynchttpclient.enableWebSocketCompression=true
\ No newline at end of file
diff --git a/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc.properties b/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc.properties
new file mode 100644
index 0000000..914456a
--- /dev/null
+++ b/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc.properties
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This file is used by the shaded asynchttpclient packaged in pulsar-client-${VERSION}-shaded.jar.
+# For more details please refer https://github.com/apache/pulsar/issues/389
+
+org.apache.pulsar.shade.org.asynchttpclient.threadPoolName=AsyncHttpClient
+org.apache.pulsar.shade.org.asynchttpclient.maxConnections=-1
+org.apache.pulsar.shade.org.asynchttpclient.maxConnectionsPerHost=-1
+org.apache.pulsar.shade.org.asynchttpclient.connectTimeout=5000
+org.apache.pulsar.shade.org.asynchttpclient.pooledConnectionIdleTimeout=60000
+org.apache.pulsar.shade.org.asynchttpclient.connectionPoolCleanerPeriod=1000
+org.apache.pulsar.shade.org.asynchttpclient.readTimeout=60000
+org.apache.pulsar.shade.org.asynchttpclient.requestTimeout=60000
+org.apache.pulsar.shade.org.asynchttpclient.connectionTtl=-1
+org.apache.pulsar.shade.org.asynchttpclient.followRedirect=false
+org.apache.pulsar.shade.org.asynchttpclient.maxRedirects=5
+org.apache.pulsar.shade.org.asynchttpclient.compressionEnforced=false
+org.apache.pulsar.shade.org.asynchttpclient.userAgent=AHC/2.1
+org.apache.pulsar.shade.org.asynchttpclient.enabledProtocols=TLSv1.2, TLSv1.1, TLSv1
+org.apache.pulsar.shade.org.asynchttpclient.enabledCipherSuites=
+org.apache.pulsar.shade.org.asynchttpclient.useProxySelector=false
+org.apache.pulsar.shade.org.asynchttpclient.useProxyProperties=false
+org.apache.pulsar.shade.org.asynchttpclient.validateResponseHeaders=true
+org.apache.pulsar.shade.org.asynchttpclient.aggregateWebSocketFrameFragments=true
+org.apache.pulsar.shade.org.asynchttpclient.strict302Handling=false
+org.apache.pulsar.shade.org.asynchttpclient.keepAlive=true
+org.apache.pulsar.shade.org.asynchttpclient.maxRequestRetry=5
+org.apache.pulsar.shade.org.asynchttpclient.disableUrlEncodingForBoundRequests=false
+org.apache.pulsar.shade.org.asynchttpclient.useLaxCookieEncoder=false
+org.apache.pulsar.shade.org.asynchttpclient.removeQueryParamOnRedirect=true
+org.apache.pulsar.shade.org.asynchttpclient.useOpenSsl=false
+org.apache.pulsar.shade.org.asynchttpclient.useInsecureTrustManager=false
+org.apache.pulsar.shade.org.asynchttpclient.disableHttpsEndpointIdentificationAlgorithm=false
+org.apache.pulsar.shade.org.asynchttpclient.sslSessionCacheSize=0
+org.apache.pulsar.shade.org.asynchttpclient.sslSessionTimeout=0
+org.apache.pulsar.shade.org.asynchttpclient.tcpNoDelay=true
+org.apache.pulsar.shade.org.asynchttpclient.soReuseAddress=false
+org.apache.pulsar.shade.org.asynchttpclient.soLinger=-1
+org.apache.pulsar.shade.org.asynchttpclient.soSndBuf=-1
+org.apache.pulsar.shade.org.asynchttpclient.soRcvBuf=-1
+org.apache.pulsar.shade.org.asynchttpclient.httpClientCodecMaxInitialLineLength=4096
+org.apache.pulsar.shade.org.asynchttpclient.httpClientCodecMaxHeaderSize=8192
+org.apache.pulsar.shade.org.asynchttpclient.httpClientCodecMaxChunkSize=8192
+org.apache.pulsar.shade.org.asynchttpclient.httpClientCodecInitialBufferSize=128
+org.apache.pulsar.shade.org.asynchttpclient.disableZeroCopy=false
+org.apache.pulsar.shade.org.asynchttpclient.handshakeTimeout=10000
+org.apache.pulsar.shade.org.asynchttpclient.chunkedFileChunkSize=8192
+org.apache.pulsar.shade.org.asynchttpclient.webSocketMaxBufferSize=128000000
+org.apache.pulsar.shade.org.asynchttpclient.webSocketMaxFrameSize=10240
+org.apache.pulsar.shade.org.asynchttpclient.keepEncodingHeader=false
+org.apache.pulsar.shade.org.asynchttpclient.shutdownQuietPeriod=2000
+org.apache.pulsar.shade.org.asynchttpclient.shutdownTimeout=15000
+org.apache.pulsar.shade.org.asynchttpclient.useNativeTransport=false
+org.apache.pulsar.shade.org.asynchttpclient.ioThreadsCount=0
+org.apache.pulsar.shade.org.asynchttpclient.acquireFreeChannelTimeout=0
+org.apache.pulsar.shade.org.org.asynchttpclient.enableWebSocketCompression=true
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
index 5606811..a0064ae 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
@@ -89,7 +89,7 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
 
         PulsarClusterSpec spec = PulsarClusterSpec.builder()
                 .numBookies(2)
-                .numBrokers(1)
+                .numBrokers(2)
                 .numProxies(1)
                 .clusterName(clusterName)
                 .build();
@@ -106,6 +106,8 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
                     "org.apache.pulsar.broker.authentication.AuthenticationProviderToken");
             brokerContainer.withEnv("authorizationEnabled", "true");
             brokerContainer.withEnv("superUserRoles", SUPER_USER_ROLE + "," + PROXY_ROLE);
+            brokerContainer.withEnv("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
+            brokerContainer.withEnv("brokerClientAuthenticationParameters", "token:" + superUserAuthToken);
         }
 
         ProxyContainer proxyContainer = pulsarCluster.getProxy();
@@ -201,4 +203,48 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
             // Expected
         }
     }
+
+    @Test
+    public void testProxyRedirectWithTokenAuth() throws Exception {
+
+        final String tenant = "token-test-tenant" + randomName(4);
+        final String namespace = tenant + "/ns-1";
+        final String topic = namespace + "/my-topic-1";
+
+        @Cleanup
+        PulsarAdmin admin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+                .authentication(AuthenticationFactory.token(superUserAuthToken))
+                .build();
+
+        try {
+            admin.tenants().createTenant(tenant,
+                    new TenantInfo(Collections.singleton(REGULAR_USER_ROLE),
+                            Collections.singleton(pulsarCluster.getClusterName())));
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        admin.namespaces().createNamespace(namespace, Collections.singleton(pulsarCluster.getClusterName()));
+        admin.namespaces().grantPermissionOnNamespace(namespace, REGULAR_USER_ROLE,
+                EnumSet.allOf(AuthAction.class));
+
+        admin.topics().createPartitionedTopic(topic, 16);
+
+        // Create the partitions
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .authentication(AuthenticationFactory.token(REGULAR_USER_ROLE))
+                .build();
+
+        // Force the topics to be created
+        client.newProducer()
+                .topic(topic)
+                .create()
+                .close();
+
+        admin.topics().getList(namespace);
+    }
 }