You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/04 18:14:24 UTC
[pulsar] branch branch-2.3 updated: Fixed issue with Authorization
header missing after client gets redirected (#3869)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new a15b6f0 Fixed issue with Authorization header missing after client gets redirected (#3869)
a15b6f0 is described below
commit a15b6f073e6a1752c5f1169c072756b7efa7f762
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Apr 4 13:11:45 2019 -0500
Fixed issue with Authorization header missing after client gets redirected (#3869)
* Fixed issue with Authorization header missing after client gets redirected
* Use AsyncHttpClient instead
* Fixed merge issue
* Convert headers after body to not break multipart encoding
* Fixed tests
* Added missing property file for AsyncHttpClient in the shaded jar
* Moved ahc.properties into org.asynchttpclient.config
* One value was missing from properties file
* Also add default file for non-shaded scenario
* Another missing AHC config key
* Print topic load exception
* Removed constraint on unit tests executor
---
.../pulsar/broker/service/BrokerService.java | 4 +-
.../broker/auth/SameThreadOrderedSafeExecutor.java | 2 +-
pulsar-client-admin-shaded/pom.xml | 6 +
.../apache/pulsar/client/admin/PulsarAdmin.java | 68 ++-----
.../admin/internal/http/AsyncHttpConnector.java | 201 +++++++++++++++++++++
.../internal/http/AsyncHttpConnectorProvider.java | 40 ++++
.../asynchttpclient/config/ahc-default.properties | 73 ++++++++
.../org/asynchttpclient/config/ahc.properties | 73 ++++++++
.../token/PulsarTokenAuthenticationBaseSuite.java | 48 ++++-
9 files changed, 456 insertions(+), 59 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 99a8930..58ddcdc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -300,7 +300,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
log.info("Started Pulsar Broker service on port {}", port.get());
}
-
+
Optional<Integer> tlsPort = serviceConfig.getBrokerServicePortTls();
if (tlsPort.isPresent()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
@@ -491,7 +491,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
if (cause instanceof ServiceUnitNotReadyException) {
log.warn("[{}] Service unit is not ready when loading the topic", topic);
} else {
- log.warn("[{}] Unexpected exception when loading topic: {}", topic, cause);
+ log.warn("[{}] Unexpected exception when loading topic: {}", topic, e.getMessage(), e);
}
return failedFuture(cause);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
index 3f74804..0df2b23 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
@@ -35,7 +35,7 @@ public class SameThreadOrderedSafeExecutor extends OrderedExecutor {
false,
false,
100000,
- 10,
+ -1,
false);
}
diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml
index c641c2b..4b36736 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -104,6 +104,12 @@
<include>**</include>
</includes>
</filter>
+ <filter>
+ <artifact>org.apache.pulsar:pulsar-client-admin-original</artifact>
+ <includes>
+ <include>**</include>
+ </includes>
+ </filter>
</filters>
<relocations>
<relocation>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index deb9722..4731879 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -18,9 +18,16 @@
*/
package org.apache.pulsar.client.admin;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.conn.ssl.DefaultHostnameVerifier;
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+
import org.apache.pulsar.client.admin.internal.BookiesImpl;
import org.apache.pulsar.client.admin.internal.BrokerStatsImpl;
import org.apache.pulsar.client.admin.internal.BrokersImpl;
@@ -38,13 +45,12 @@ import org.apache.pulsar.client.admin.internal.SourceImpl;
import org.apache.pulsar.client.admin.internal.TenantsImpl;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.admin.internal.WorkerImpl;
+import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider;
import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.common.util.SecurityUtility;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.jackson.JacksonFeature;
@@ -53,17 +59,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;
-import javax.net.ssl.SSLContext;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.WebTarget;
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.URL;
-import java.security.cert.X509Certificate;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
/**
* Pulsar client admin API client.
*/
@@ -155,6 +150,7 @@ public class PulsarAdmin implements Closeable {
httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
httpConfig.register(MultiPartFeature.class);
+ httpConfig.connectorProvider(new AsyncHttpConnectorProvider(clientConfigData));
ClientBuilder clientBuilder = ClientBuilder.newBuilder()
.withConfig(httpConfig)
@@ -162,45 +158,7 @@ public class PulsarAdmin implements Closeable {
.readTimeout(this.readTimeout, this.readTimeoutUnit)
.register(JacksonConfigurator.class).register(JacksonFeature.class);
- boolean useTls = false;
-
- if (clientConfigData != null && StringUtils.isNotBlank(clientConfigData.getServiceUrl())
- && clientConfigData.getServiceUrl().startsWith("https://")) {
- useTls = true;
- try {
- SSLContext sslCtx = null;
-
- X509Certificate trustCertificates[] = SecurityUtility
- .loadCertificatesFromPemFile(clientConfigData.getTlsTrustCertsFilePath());
-
- // Set private key and certificate if available
- AuthenticationDataProvider authData = auth.getAuthData();
- if (authData.hasDataForTls()) {
- sslCtx = SecurityUtility.createSslContext(clientConfigData.isTlsAllowInsecureConnection(),
- trustCertificates, authData.getTlsCertificates(), authData.getTlsPrivateKey());
- } else {
- sslCtx = SecurityUtility.createSslContext(clientConfigData.isTlsAllowInsecureConnection(),
- trustCertificates);
- }
-
- clientBuilder.sslContext(sslCtx);
- if (clientConfigData.isTlsHostnameVerificationEnable()) {
- clientBuilder.hostnameVerifier(new DefaultHostnameVerifier());
- } else {
- // Disable hostname verification
- clientBuilder.hostnameVerifier(NoopHostnameVerifier.INSTANCE);
- }
- } catch (Exception e) {
- try {
- if (auth != null) {
- auth.close();
- }
- } catch (IOException ioe) {
- LOG.error("Failed to close the authentication service", ioe);
- }
- throw new PulsarClientException.InvalidConfigurationException(e.getMessage());
- }
- }
+ boolean useTls = clientConfigData.getServiceUrl().startsWith("https://");
this.client = clientBuilder.build();
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
new file mode 100644
index 0000000..fd89108
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal.http;
+
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.ssl.SslContext;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response.Status;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.SecurityUtility;
+import org.asynchttpclient.AsyncCompletionHandler;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
+import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.ClientRequest;
+import org.glassfish.jersey.client.ClientResponse;
+import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
+import org.glassfish.jersey.client.spi.Connector;
+
+@Slf4j
+public class AsyncHttpConnector implements Connector {
+
+ private final AsyncHttpClient httpClient;
+
+ @SneakyThrows
+ public AsyncHttpConnector(Client client, ClientConfigurationData conf) {
+ DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
+ confBuilder.setFollowRedirect(true);
+ confBuilder.setConnectTimeout((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT));
+ confBuilder.setReadTimeout((int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT));
+ confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
+ confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() {
+ @Override
+ public boolean keepAlive(Request ahcRequest, HttpRequest request, HttpResponse response) {
+ // Close connection upon a server error or per HTTP spec
+ return (response.status().code() / 100 != 5) && super.keepAlive(ahcRequest, request, response);
+ }
+ });
+
+ if (conf != null && StringUtils.isNotBlank(conf.getServiceUrl())
+ && conf.getServiceUrl().startsWith("https://")) {
+
+ SslContext sslCtx = null;
+
+ // Set client key and certificate if available
+ AuthenticationDataProvider authData = conf.getAuthentication().getAuthData();
+ if (authData.hasDataForTls()) {
+ sslCtx = SecurityUtility.createNettySslContextForClient(
+ conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
+ conf.getTlsTrustCertsFilePath(),
+ authData.getTlsCertificates(), authData.getTlsPrivateKey());
+ } else {
+ sslCtx = SecurityUtility.createNettySslContextForClient(
+ conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
+ conf.getTlsTrustCertsFilePath());
+ }
+
+ confBuilder.setSslContext(sslCtx);
+ }
+ httpClient = new DefaultAsyncHttpClient(confBuilder.build());
+ }
+
+ @Override
+ public ClientResponse apply(ClientRequest jerseyRequest) {
+ CompletableFuture<ClientResponse> future = new CompletableFuture<>();
+
+ try {
+ Future<?> resultFuture = apply(jerseyRequest, new AsyncConnectorCallback() {
+ @Override
+ public void response(ClientResponse response) {
+ future.complete(response);
+ }
+
+ @Override
+ public void failure(Throwable failure) {
+ future.completeExceptionally(failure);
+ }
+ });
+
+ Integer timeout = ClientProperties.getValue(
+ jerseyRequest.getConfiguration().getProperties(),
+ ClientProperties.READ_TIMEOUT, 0);
+
+ if (timeout != null && timeout > 0) {
+ resultFuture.get(timeout, TimeUnit.MILLISECONDS);
+ } else {
+ resultFuture.get();
+ }
+ } catch (ExecutionException ex) {
+ Throwable e = ex.getCause() == null ? ex : ex.getCause();
+ throw new ProcessingException(e.getMessage(), e);
+ } catch (Exception ex) {
+ throw new ProcessingException(ex.getMessage(), ex);
+ }
+
+ return future.join();
+ }
+
+ @Override
+ public Future<?> apply(ClientRequest jerseyRequest, AsyncConnectorCallback callback) {
+ final CompletableFuture<Object> future = new CompletableFuture<>();
+
+ BoundRequestBuilder builder = httpClient.prepare(jerseyRequest.getMethod(), jerseyRequest.getUri().toString());
+
+ if (jerseyRequest.hasEntity()) {
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ jerseyRequest.setStreamProvider(contentLength -> outStream);
+ try {
+ jerseyRequest.writeEntity();
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ return future;
+ }
+
+ builder.setBody(outStream.toByteArray());
+ }
+
+ jerseyRequest.getHeaders().forEach((key, headers) -> {
+ if (!HttpHeaders.USER_AGENT.equals(key)) {
+ builder.addHeader(key, headers);
+ }
+ });
+
+ builder.execute(new AsyncCompletionHandler<Response>() {
+ @Override
+ public Response onCompleted(Response response) throws Exception {
+ ClientResponse jerseyResponse = new ClientResponse(Status.fromStatusCode(response.getStatusCode()),
+ jerseyRequest);
+ response.getHeaders().forEach(e -> jerseyResponse.header(e.getKey(), e.getValue()));
+ if (response.hasResponseBody()) {
+ jerseyResponse.setEntityStream(response.getResponseBodyAsStream());
+ }
+ callback.response(jerseyResponse);
+ future.complete(jerseyResponse);
+ return response;
+ }
+
+ @Override
+ public void onThrowable(Throwable t) {
+ callback.failure(t);
+ future.completeExceptionally(t);
+ }
+ });
+
+ return future;
+ }
+
+ @Override
+ public String getName() {
+ return "Pulsar-Admin";
+ }
+
+ @Override
+ public void close() {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ log.warn("Failed to close http client", e);
+ }
+ }
+
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
new file mode 100644
index 0000000..2f24089
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal.http;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.core.Configuration;
+
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.glassfish.jersey.client.spi.Connector;
+import org.glassfish.jersey.client.spi.ConnectorProvider;
+
+public class AsyncHttpConnectorProvider implements ConnectorProvider {
+
+ private final ClientConfigurationData conf;
+
+ public AsyncHttpConnectorProvider(ClientConfigurationData conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Connector getConnector(Client client, Configuration runtimeConfig) {
+ return new AsyncHttpConnector(client, conf);
+ }
+}
diff --git a/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc-default.properties b/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc-default.properties
new file mode 100644
index 0000000..fa74eb6
--- /dev/null
+++ b/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc-default.properties
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This file is used by the shaded asynchttpclient packaged in pulsar-client-${VERSION}-shaded.jar.
+# For more details please refer https://github.com/apache/pulsar/issues/389
+
+org.asynchttpclient.threadPoolName=AsyncHttpClient
+org.asynchttpclient.maxConnections=-1
+org.asynchttpclient.maxConnectionsPerHost=-1
+org.asynchttpclient.connectTimeout=5000
+org.asynchttpclient.pooledConnectionIdleTimeout=60000
+org.asynchttpclient.connectionPoolCleanerPeriod=1000
+org.asynchttpclient.readTimeout=60000
+org.asynchttpclient.requestTimeout=60000
+org.asynchttpclient.connectionTtl=-1
+org.asynchttpclient.followRedirect=false
+org.asynchttpclient.maxRedirects=5
+org.asynchttpclient.compressionEnforced=false
+org.asynchttpclient.userAgent=AHC/2.1
+org.asynchttpclient.enabledProtocols=TLSv1.2, TLSv1.1, TLSv1
+org.asynchttpclient.enabledCipherSuites=
+org.asynchttpclient.useProxySelector=false
+org.asynchttpclient.useProxyProperties=false
+org.asynchttpclient.validateResponseHeaders=true
+org.asynchttpclient.aggregateWebSocketFrameFragments=true
+org.asynchttpclient.strict302Handling=false
+org.asynchttpclient.keepAlive=true
+org.asynchttpclient.maxRequestRetry=5
+org.asynchttpclient.disableUrlEncodingForBoundRequests=false
+org.asynchttpclient.useLaxCookieEncoder=false
+org.asynchttpclient.removeQueryParamOnRedirect=true
+org.asynchttpclient.useOpenSsl=false
+org.asynchttpclient.useInsecureTrustManager=false
+org.asynchttpclient.disableHttpsEndpointIdentificationAlgorithm=false
+org.asynchttpclient.sslSessionCacheSize=0
+org.asynchttpclient.sslSessionTimeout=0
+org.asynchttpclient.tcpNoDelay=true
+org.asynchttpclient.soReuseAddress=false
+org.asynchttpclient.soLinger=-1
+org.asynchttpclient.soSndBuf=-1
+org.asynchttpclient.soRcvBuf=-1
+org.asynchttpclient.httpClientCodecMaxInitialLineLength=4096
+org.asynchttpclient.httpClientCodecMaxHeaderSize=8192
+org.asynchttpclient.httpClientCodecMaxChunkSize=8192
+org.asynchttpclient.httpClientCodecInitialBufferSize=128
+org.asynchttpclient.disableZeroCopy=false
+org.asynchttpclient.handshakeTimeout=10000
+org.asynchttpclient.chunkedFileChunkSize=8192
+org.asynchttpclient.webSocketMaxBufferSize=128000000
+org.asynchttpclient.webSocketMaxFrameSize=10240
+org.asynchttpclient.keepEncodingHeader=false
+org.asynchttpclient.shutdownQuietPeriod=2000
+org.asynchttpclient.shutdownTimeout=15000
+org.asynchttpclient.useNativeTransport=false
+org.asynchttpclient.ioThreadsCount=0
+org.asynchttpclient.acquireFreeChannelTimeout=0
+org.asynchttpclient.enableWebSocketCompression=true
\ No newline at end of file
diff --git a/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc.properties b/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc.properties
new file mode 100644
index 0000000..914456a
--- /dev/null
+++ b/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc.properties
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This file is used by the shaded asynchttpclient packaged in pulsar-client-${VERSION}-shaded.jar.
+# For more details please refer https://github.com/apache/pulsar/issues/389
+
+org.apache.pulsar.shade.org.asynchttpclient.threadPoolName=AsyncHttpClient
+org.apache.pulsar.shade.org.asynchttpclient.maxConnections=-1
+org.apache.pulsar.shade.org.asynchttpclient.maxConnectionsPerHost=-1
+org.apache.pulsar.shade.org.asynchttpclient.connectTimeout=5000
+org.apache.pulsar.shade.org.asynchttpclient.pooledConnectionIdleTimeout=60000
+org.apache.pulsar.shade.org.asynchttpclient.connectionPoolCleanerPeriod=1000
+org.apache.pulsar.shade.org.asynchttpclient.readTimeout=60000
+org.apache.pulsar.shade.org.asynchttpclient.requestTimeout=60000
+org.apache.pulsar.shade.org.asynchttpclient.connectionTtl=-1
+org.apache.pulsar.shade.org.asynchttpclient.followRedirect=false
+org.apache.pulsar.shade.org.asynchttpclient.maxRedirects=5
+org.apache.pulsar.shade.org.asynchttpclient.compressionEnforced=false
+org.apache.pulsar.shade.org.asynchttpclient.userAgent=AHC/2.1
+org.apache.pulsar.shade.org.asynchttpclient.enabledProtocols=TLSv1.2, TLSv1.1, TLSv1
+org.apache.pulsar.shade.org.asynchttpclient.enabledCipherSuites=
+org.apache.pulsar.shade.org.asynchttpclient.useProxySelector=false
+org.apache.pulsar.shade.org.asynchttpclient.useProxyProperties=false
+org.apache.pulsar.shade.org.asynchttpclient.validateResponseHeaders=true
+org.apache.pulsar.shade.org.asynchttpclient.aggregateWebSocketFrameFragments=true
+org.apache.pulsar.shade.org.asynchttpclient.strict302Handling=false
+org.apache.pulsar.shade.org.asynchttpclient.keepAlive=true
+org.apache.pulsar.shade.org.asynchttpclient.maxRequestRetry=5
+org.apache.pulsar.shade.org.asynchttpclient.disableUrlEncodingForBoundRequests=false
+org.apache.pulsar.shade.org.asynchttpclient.useLaxCookieEncoder=false
+org.apache.pulsar.shade.org.asynchttpclient.removeQueryParamOnRedirect=true
+org.apache.pulsar.shade.org.asynchttpclient.useOpenSsl=false
+org.apache.pulsar.shade.org.asynchttpclient.useInsecureTrustManager=false
+org.apache.pulsar.shade.org.asynchttpclient.disableHttpsEndpointIdentificationAlgorithm=false
+org.apache.pulsar.shade.org.asynchttpclient.sslSessionCacheSize=0
+org.apache.pulsar.shade.org.asynchttpclient.sslSessionTimeout=0
+org.apache.pulsar.shade.org.asynchttpclient.tcpNoDelay=true
+org.apache.pulsar.shade.org.asynchttpclient.soReuseAddress=false
+org.apache.pulsar.shade.org.asynchttpclient.soLinger=-1
+org.apache.pulsar.shade.org.asynchttpclient.soSndBuf=-1
+org.apache.pulsar.shade.org.asynchttpclient.soRcvBuf=-1
+org.apache.pulsar.shade.org.asynchttpclient.httpClientCodecMaxInitialLineLength=4096
+org.apache.pulsar.shade.org.asynchttpclient.httpClientCodecMaxHeaderSize=8192
+org.apache.pulsar.shade.org.asynchttpclient.httpClientCodecMaxChunkSize=8192
+org.apache.pulsar.shade.org.asynchttpclient.httpClientCodecInitialBufferSize=128
+org.apache.pulsar.shade.org.asynchttpclient.disableZeroCopy=false
+org.apache.pulsar.shade.org.asynchttpclient.handshakeTimeout=10000
+org.apache.pulsar.shade.org.asynchttpclient.chunkedFileChunkSize=8192
+org.apache.pulsar.shade.org.asynchttpclient.webSocketMaxBufferSize=128000000
+org.apache.pulsar.shade.org.asynchttpclient.webSocketMaxFrameSize=10240
+org.apache.pulsar.shade.org.asynchttpclient.keepEncodingHeader=false
+org.apache.pulsar.shade.org.asynchttpclient.shutdownQuietPeriod=2000
+org.apache.pulsar.shade.org.asynchttpclient.shutdownTimeout=15000
+org.apache.pulsar.shade.org.asynchttpclient.useNativeTransport=false
+org.apache.pulsar.shade.org.asynchttpclient.ioThreadsCount=0
+org.apache.pulsar.shade.org.asynchttpclient.acquireFreeChannelTimeout=0
+org.apache.pulsar.shade.org.org.asynchttpclient.enableWebSocketCompression=true
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
index 5606811..a0064ae 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
@@ -89,7 +89,7 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
PulsarClusterSpec spec = PulsarClusterSpec.builder()
.numBookies(2)
- .numBrokers(1)
+ .numBrokers(2)
.numProxies(1)
.clusterName(clusterName)
.build();
@@ -106,6 +106,8 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
"org.apache.pulsar.broker.authentication.AuthenticationProviderToken");
brokerContainer.withEnv("authorizationEnabled", "true");
brokerContainer.withEnv("superUserRoles", SUPER_USER_ROLE + "," + PROXY_ROLE);
+ brokerContainer.withEnv("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
+ brokerContainer.withEnv("brokerClientAuthenticationParameters", "token:" + superUserAuthToken);
}
ProxyContainer proxyContainer = pulsarCluster.getProxy();
@@ -201,4 +203,48 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
// Expected
}
}
+
+ @Test
+ public void testProxyRedirectWithTokenAuth() throws Exception {
+
+ final String tenant = "token-test-tenant" + randomName(4);
+ final String namespace = tenant + "/ns-1";
+ final String topic = namespace + "/my-topic-1";
+
+ @Cleanup
+ PulsarAdmin admin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+ .authentication(AuthenticationFactory.token(superUserAuthToken))
+ .build();
+
+ try {
+ admin.tenants().createTenant(tenant,
+ new TenantInfo(Collections.singleton(REGULAR_USER_ROLE),
+ Collections.singleton(pulsarCluster.getClusterName())));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ admin.namespaces().createNamespace(namespace, Collections.singleton(pulsarCluster.getClusterName()));
+ admin.namespaces().grantPermissionOnNamespace(namespace, REGULAR_USER_ROLE,
+ EnumSet.allOf(AuthAction.class));
+
+ admin.topics().createPartitionedTopic(topic, 16);
+
+ // Create the partitions
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .authentication(AuthenticationFactory.token(REGULAR_USER_ROLE))
+ .build();
+
+ // Force the topics to be created
+ client.newProducer()
+ .topic(topic)
+ .create()
+ .close();
+
+ admin.topics().getList(namespace);
+ }
}