You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ja...@apache.org on 2018/01/31 00:09:39 UTC
[incubator-pulsar] branch master updated: Making Pulsar Proxy more
secure (#1002)
This is an automated email from the ASF dual-hosted git repository.
jai1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dde5865 Making Pulsar Proxy more secure (#1002)
dde5865 is described below
commit dde58652aa441afb86e4d9b0541df8222b648ab0
Author: Jai Asher <ja...@ccs.neu.edu>
AuthorDate: Tue Jan 30 16:09:37 2018 -0800
Making Pulsar Proxy more secure (#1002)
---
.../broker/authorization/AuthorizationManager.java | 72 ++-
.../pulsar/broker/admin/PersistentTopics.java | 2 +-
.../apache/pulsar/broker/service/ServerCnx.java | 635 ++++++++++++---------
.../org/apache/pulsar/common/api/Commands.java | 54 +-
.../apache/pulsar/common/api/proto/PulsarApi.java | 188 ++++++
pulsar-common/src/main/proto/PulsarApi.proto | 9 +-
.../pulsar/proxy/server/LookupProxyHandler.java | 167 ++++--
.../pulsar/proxy/server/ProxyConfiguration.java | 22 +-
.../apache/pulsar/proxy/server/ProxyService.java | 31 +-
.../pulsar/proxy/server/ProxyServiceStarter.java | 9 +-
.../ProxyAuthenticatedProducerConsumerTest.java | 7 +-
.../org/apache/pulsar/proxy/server/ProxyTest.java | 5 +
.../apache/pulsar/proxy/server/ProxyTlsTest.java | 5 +-
...ava => ProxyWithProxyAuthorizationNegTest.java} | 127 +++--
...t.java => ProxyWithProxyAuthorizationTest.java} | 107 ++--
....java => ProxyWithoutServiceDiscoveryTest.java} | 35 +-
.../broker-cert.pem | 72 +++
.../ProxyWithProxyAuthorizationTest/broker-key.pem | 28 +
.../tls/ProxyWithProxyAuthorizationTest/cacert.pem | 62 ++
.../client-cert.pem | 72 +++
.../ProxyWithProxyAuthorizationTest/client-key.pem | 28 +
.../ProxyWithProxyAuthorizationTest/proxy-cert.pem | 72 +++
.../ProxyWithProxyAuthorizationTest/proxy-key.pem | 28 +
23 files changed, 1349 insertions(+), 488 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
index 9fa31cc..2bf7ce6 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
@@ -21,6 +21,9 @@ package org.apache.pulsar.broker.authorization;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
@@ -66,8 +69,8 @@ public class AuthorizationManager {
log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
throw e;
} catch (Exception e) {
- log.warn("Producer-client with Role - {} failed to get permissions for destination - {}", role,
- destination, e);
+ log.warn("Producer-client with Role - {} failed to get permissions for destination - {}. {}", role,
+ destination, e.getMessage());
throw e;
}
}
@@ -96,8 +99,9 @@ public class AuthorizationManager {
switch (policies.get().subscription_auth_mode) {
case Prefix:
if (!subscription.startsWith(role)) {
- PulsarServerException ex = new PulsarServerException(
- String.format("Failed to create consumer - The subscription name needs to be prefixed by the authentication role, like %s-xxxx for destination: %s", role, destination));
+ PulsarServerException ex = new PulsarServerException(String.format(
+ "Failed to create consumer - The subscription name needs to be prefixed by the authentication role, like %s-xxxx for destination: %s",
+ role, destination));
permissionFuture.completeExceptionally(ex);
return;
}
@@ -111,13 +115,12 @@ public class AuthorizationManager {
permissionFuture.complete(isAuthorized);
});
}).exceptionally(ex -> {
- log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination,
- ex);
+ log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination, ex.getMessage());
permissionFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
- log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, e);
+ log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination, e.getMessage());
permissionFuture.completeExceptionally(e);
}
return permissionFuture;
@@ -130,8 +133,8 @@ public class AuthorizationManager {
log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
throw e;
} catch (Exception e) {
- log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}", role,
- destination, e);
+ log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}. {}", role,
+ destination, e.getMessage());
throw e;
}
}
@@ -150,8 +153,46 @@ public class AuthorizationManager {
return canProduce(destination, role) || canConsume(destination, role, null);
}
- private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role,
- AuthAction action) {
+ /**
+ * Check whether the specified role can perform a lookup for the specified destination.
+ *
+ * For that the caller needs to have producer or consumer permission.
+ *
+ * @param destination
+ * @param role
+ * @return
+ * @throws Exception
+ */
+ public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role) {
+ CompletableFuture<Boolean> finalResult = new CompletableFuture<Boolean>();
+ canProduceAsync(destination, role).whenComplete((produceAuthorized, ex) -> {
+ if (ex == null) {
+ if (produceAuthorized) {
+ finalResult.complete(produceAuthorized);
+ return;
+ }
+ } else if (log.isDebugEnabled()) {
+ log.debug("Destination [{}] Role [{}] exception occured while trying to check Produce permissions. {}",
+ destination.toString(), role, ex.getMessage());
+ }
+ canConsumeAsync(destination, role, null).whenComplete((consumeAuthorized, e) -> {
+ if (e == null) {
+ if (consumeAuthorized) {
+ finalResult.complete(consumeAuthorized);
+ return;
+ }
+ } else if (log.isDebugEnabled()) {
+ log.debug(
+ "Destination [{}] Role [{}] exception occured while trying to check Consume permissions. {}",
+ destination.toString(), role, e.getMessage());
+ }
+ finalResult.complete(false);
+ });
+ });
+ return finalResult;
+ }
+
+ private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role, AuthAction action) {
if (isSuperUser(role)) {
return CompletableFuture.completedFuture(true);
} else {
@@ -218,13 +259,13 @@ public class AuthorizationManager {
}
permissionFuture.complete(false);
}).exceptionally(ex -> {
- log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination,
- ex);
+ log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination,
+ ex.getMessage());
permissionFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
- log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, e);
+ log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination, e.getMessage());
permissionFuture.completeExceptionally(e);
}
return permissionFuture;
@@ -244,8 +285,7 @@ public class AuthorizationManager {
}
// Suffix match
- if (permittedRole.charAt(0) == '*'
- && checkedRole.endsWith(permittedRole.substring(1))
+ if (permittedRole.charAt(0) == '*' && checkedRole.endsWith(permittedRole.substring(1))
&& permittedActions.contains(checkedAction)) {
return true;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
index b7361b0..f241cf5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
@@ -1314,7 +1314,7 @@ public class PersistentTopics extends AdminResource {
validateAdminAccessOnProperty(pulsar, clientAppId, dn.getProperty());
} catch (RestException authException) {
log.warn("Failed to authorize {} on cluster {}", clientAppId, dn.toString());
- throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s",
+ throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
clientAppId, dn.toString(), authException.getMessage()));
}
} catch (Exception ex) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bd49fc7..8888c97 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -100,7 +100,7 @@ public class ServerCnx extends PulsarHandler {
private String clientVersion = null;
private int nonPersistentPendingMessages = 0;
private final int MaxNonPersistentPendingMessages;
- private String originalPrincipal;
+ private String originalPrincipal = null;
enum State {
Start, Connected, Failed
@@ -187,73 +187,128 @@ public class ServerCnx extends PulsarHandler {
@Override
protected void handleLookup(CommandLookupTopic lookup) {
final long requestId = lookup.getRequestId();
- final String topic = lookup.getTopic();
+ final String topicName = lookup.getTopic();
if (log.isDebugEnabled()) {
- log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId);
+ log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId);
}
+
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
- lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(),
- getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> {
- if (ex == null) {
- ctx.writeAndFlush(lookupResponse);
- } else {
- // it should never happen
- log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex);
- ctx.writeAndFlush(
- newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
- }
- lookupSemaphore.release();
- return null;
- });
+ final String originalPrincipal = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal()
+ : this.originalPrincipal;
+ CompletableFuture<Boolean> isProxyAuthorizedFuture;
+ if (service.isAuthorizationEnabled() && originalPrincipal != null) {
+ isProxyAuthorizedFuture = service.getAuthorizationManager()
+ .canLookupAsync(DestinationName.get(topicName), authRole);
+ } else {
+ isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+ }
+
+ isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
+ if (isProxyAuthorized) {
+ lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topicName),
+ lookup.getAuthoritative(), originalPrincipal != null ? originalPrincipal : authRole,
+ lookup.getRequestId()).handle((lookupResponse, ex) -> {
+ if (ex == null) {
+ ctx.writeAndFlush(lookupResponse);
+ } else {
+ // it should never happen
+ log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topicName,
+ ex.getMessage(), ex);
+ ctx.writeAndFlush(newLookupErrorResponse(ServerError.ServiceNotReady,
+ ex.getMessage(), requestId));
+ }
+ lookupSemaphore.release();
+ return null;
+ });
+ } else {
+ final String msg = "Proxy Client is not authorized to Lookup";
+ log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+ ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
+ lookupSemaphore.release();
+ }
+ return null;
+ }).exceptionally(ex -> {
+ final String msg = "Exception occured while trying to authorize lookup";
+ log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+ ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
+ lookupSemaphore.release();
+ return null;
+ });
} else {
if (log.isDebugEnabled()) {
- log.debug("[{}] Failed lookup due to too many lookup-requests {}", remoteAddress, topic);
+ log.debug("[{}] Failed lookup due to too many lookup-requests {}", remoteAddress, topicName);
}
ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests,
"Failed due to too many pending lookup requests", requestId));
}
-
}
@Override
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
final long requestId = partitionMetadata.getRequestId();
- final String topic = partitionMetadata.getTopic();
+ final String topicName = partitionMetadata.getTopic();
if (log.isDebugEnabled()) {
- log.debug("[{}] Received PartitionMetadataLookup from {} for {}", topic, remoteAddress, requestId);
+ log.debug("[{}] Received PartitionMetadataLookup from {} for {}", topicName, remoteAddress, requestId);
}
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
- getPartitionedTopicMetadata(getBrokerService().pulsar(),
- originalPrincipal != null ? originalPrincipal : authRole, DestinationName.get(topic))
- .handle((metadata, ex) -> {
- if (ex == null) {
- int partitions = metadata.partitions;
- ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
- } else {
- if (ex instanceof PulsarClientException) {
- log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(),
- remoteAddress, topic, ex.getMessage());
- ctx.writeAndFlush(Commands.newPartitionMetadataResponse(
- ServerError.AuthorizationError, ex.getMessage(), requestId));
+
+ final String originalPrincipal = partitionMetadata.hasOriginalPrincipal()
+ ? partitionMetadata.getOriginalPrincipal() : this.originalPrincipal;
+ CompletableFuture<Boolean> isProxyAuthorizedFuture;
+ if (service.isAuthorizationEnabled() && originalPrincipal != null) {
+ isProxyAuthorizedFuture = service.getAuthorizationManager()
+ .canLookupAsync(DestinationName.get(topicName), authRole);
+ } else {
+ isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+ }
+ isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
+ if (isProxyAuthorized) {
+ getPartitionedTopicMetadata(getBrokerService().pulsar(),
+ originalPrincipal != null ? originalPrincipal : authRole,
+ DestinationName.get(topicName)).handle((metadata, ex) -> {
+ if (ex == null) {
+ int partitions = metadata.partitions;
+ ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
} else {
- log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topic,
- ex.getMessage(), ex);
- ServerError error = (ex instanceof RestException)
- && ((RestException) ex).getResponse().getStatus() < 500
- ? ServerError.MetadataError : ServerError.ServiceNotReady;
- ctx.writeAndFlush(Commands.newPartitionMetadataResponse(error, ex.getMessage(),
- requestId));
+ if (ex instanceof PulsarClientException) {
+ log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(),
+ remoteAddress, topicName, ex.getMessage());
+ ctx.writeAndFlush(Commands.newPartitionMetadataResponse(
+ ServerError.AuthorizationError, ex.getMessage(), requestId));
+ } else {
+ log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress,
+ topicName, ex.getMessage(), ex);
+ ServerError error = (ex instanceof RestException)
+ && ((RestException) ex).getResponse().getStatus() < 500
+ ? ServerError.MetadataError : ServerError.ServiceNotReady;
+ ctx.writeAndFlush(Commands.newPartitionMetadataResponse(error,
+ ex.getMessage(), requestId));
+ }
}
- }
- lookupSemaphore.release();
- return null;
- });
+ lookupSemaphore.release();
+ return null;
+ });
+ } else {
+ final String msg = "Proxy Client is not authorized to Get Partition Metadata";
+ log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+ ctx.writeAndFlush(
+ Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId));
+ lookupSemaphore.release();
+ }
+ return null;
+ }).exceptionally(ex -> {
+ final String msg = "Exception occured while trying to authorize get Partition Metadata";
+ log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+ ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
+ lookupSemaphore.release();
+ return null;
+ });
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requests {}", remoteAddress,
- topic);
+ topicName);
}
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.TooManyRequests,
"Failed due to too many pending lookup requests", requestId));
@@ -330,11 +385,12 @@ public class ServerCnx extends PulsarHandler {
if (sslHandler != null) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
}
+
originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null;
authRole = getBrokerService().getAuthenticationService()
.authenticate(new AuthenticationDataCommand(authData, remoteAddress, sslSession), authMethod);
- log.info("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod, authRole);
+ log.info("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", remoteAddress, authMethod, authRole, originalPrincipal);
} catch (AuthenticationException e) {
String msg = "Unable to authenticate";
log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage());
@@ -358,119 +414,140 @@ public class ServerCnx extends PulsarHandler {
@Override
protected void handleSubscribe(final CommandSubscribe subscribe) {
checkArgument(state == State.Connected);
- CompletableFuture<Boolean> authorizationFuture;
- if (service.isAuthorizationEnabled()) {
- authorizationFuture = service.getAuthorizationManager().canConsumeAsync(
- DestinationName.get(subscribe.getTopic()),
- originalPrincipal != null ? originalPrincipal : authRole,
- subscribe.getSubscription());
- } else {
- authorizationFuture = CompletableFuture.completedFuture(true);
- }
final String topicName = subscribe.getTopic();
- final String subscriptionName = subscribe.getSubscription();
final long requestId = subscribe.getRequestId();
final long consumerId = subscribe.getConsumerId();
- final SubType subType = subscribe.getSubType();
- final String consumerName = subscribe.getConsumerName();
- final boolean isDurable = subscribe.getDurable();
- final MessageIdImpl startMessageId = subscribe.hasStartMessageId()
- ? new BatchMessageIdImpl(subscribe.getStartMessageId().getLedgerId(),
- subscribe.getStartMessageId().getEntryId(), subscribe.getStartMessageId().getPartition(),
- subscribe.getStartMessageId().getBatchIndex())
- : null;
-
- final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
- final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
-
- authorizationFuture.thenApply(isAuthorized -> {
- if (isAuthorized) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, authRole);
- }
-
- log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
- try {
- Metadata.validateMetadata(metadata);
- } catch (IllegalArgumentException iae) {
- final String msg = iae.getMessage();
- ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
- return null;
+ CompletableFuture<Boolean> isProxyAuthorizedFuture;
+ if (service.isAuthorizationEnabled() && originalPrincipal != null) {
+ isProxyAuthorizedFuture = service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topicName),
+ authRole, subscribe.getSubscription());
+ } else {
+ isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+ }
+ isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
+ if (isProxyAuthorized) {
+ CompletableFuture<Boolean> authorizationFuture;
+ if (service.isAuthorizationEnabled()) {
+ authorizationFuture = service.getAuthorizationManager().canConsumeAsync(
+ DestinationName.get(subscribe.getTopic()),
+ originalPrincipal != null ? originalPrincipal : authRole, subscribe.getSubscription());
+ } else {
+ authorizationFuture = CompletableFuture.completedFuture(true);
}
- CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
- CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId, consumerFuture);
-
- if (existingConsumerFuture != null) {
- if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
- Consumer consumer = existingConsumerFuture.getNow(null);
- log.info("[{}] Consumer with the same id is already created: {}", remoteAddress, consumer);
- ctx.writeAndFlush(Commands.newSuccess(requestId));
- return null;
- } else {
- // There was an early request to create a consumer with same consumerId. This can happen when
- // client timeout is lower the broker timeouts. We need to wait until the previous consumer
- // creation request either complete or fails.
- log.warn("[{}][{}][{}] Consumer is already present on the connection", remoteAddress, topicName,
- subscriptionName);
- ServerError error = !existingConsumerFuture.isDone() ? ServerError.ServiceNotReady
- : getErrorCode(existingConsumerFuture);
- ctx.writeAndFlush(
- Commands.newError(requestId, error, "Consumer is already present on the connection"));
- return null;
- }
- }
+ final String subscriptionName = subscribe.getSubscription();
+ final SubType subType = subscribe.getSubType();
+ final String consumerName = subscribe.getConsumerName();
+ final boolean isDurable = subscribe.getDurable();
+ final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(
+ subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
+ subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
+ : null;
+
+ final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
+ final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
+
+ authorizationFuture.thenApply(isAuthorized -> {
+ if (isAuthorized) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, authRole);
+ }
- service.getTopic(topicName).thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName,
- consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata))
- .thenAccept(consumer -> {
- if (consumerFuture.complete(consumer)) {
- log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName,
- subscriptionName);
- ctx.writeAndFlush(Commands.newSuccess(requestId), ctx.voidPromise());
+ log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
+ try {
+ Metadata.validateMetadata(metadata);
+ } catch (IllegalArgumentException iae) {
+ final String msg = iae.getMessage();
+ ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
+ return null;
+ }
+ CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
+ CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,
+ consumerFuture);
+
+ if (existingConsumerFuture != null) {
+ if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
+ Consumer consumer = existingConsumerFuture.getNow(null);
+ log.info("[{}] Consumer with the same id is already created: {}", remoteAddress,
+ consumer);
+ ctx.writeAndFlush(Commands.newSuccess(requestId));
+ return null;
} else {
- // The consumer future was completed before by a close command
- try {
- consumer.close();
- log.info("[{}] Cleared consumer created after timeout on client side {}",
- remoteAddress, consumer);
- } catch (BrokerServiceException e) {
- log.warn("[{}] Error closing consumer created after timeout on client side {}: {}",
- remoteAddress, consumer, e.getMessage());
- }
- consumers.remove(consumerId, consumerFuture);
+ // There was an early request to create a consumer with same consumerId. This can happen
+ // when
+ // client timeout is lower the broker timeouts. We need to wait until the previous
+ // consumer
+ // creation request either complete or fails.
+ log.warn("[{}][{}][{}] Consumer is already present on the connection", remoteAddress,
+ topicName, subscriptionName);
+ ServerError error = !existingConsumerFuture.isDone() ? ServerError.ServiceNotReady
+ : getErrorCode(existingConsumerFuture);
+ ctx.writeAndFlush(Commands.newError(requestId, error,
+ "Consumer is already present on the connection"));
+ return null;
}
+ }
- }) //
- .exceptionally(exception -> {
- if (exception.getCause() instanceof ConsumerBusyException) {
- if (log.isDebugEnabled()) {
- log.debug(
- "[{}][{}][{}] Failed to create consumer because exclusive consumer is already connected: {}",
- remoteAddress, topicName, subscriptionName,
- exception.getCause().getMessage());
- }
- } else {
- log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
- subscriptionName, exception.getCause().getMessage(), exception);
- }
+ service.getTopic(topicName)
+ .thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
+ subType, priorityLevel, consumerName, isDurable, startMessageId, metadata))
+ .thenAccept(consumer -> {
+ if (consumerFuture.complete(consumer)) {
+ log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName,
+ subscriptionName);
+ ctx.writeAndFlush(Commands.newSuccess(requestId), ctx.voidPromise());
+ } else {
+ // The consumer future was completed before by a close command
+ try {
+ consumer.close();
+ log.info("[{}] Cleared consumer created after timeout on client side {}",
+ remoteAddress, consumer);
+ } catch (BrokerServiceException e) {
+ log.warn(
+ "[{}] Error closing consumer created after timeout on client side {}: {}",
+ remoteAddress, consumer, e.getMessage());
+ }
+ consumers.remove(consumerId, consumerFuture);
+ }
- // If client timed out, the future would have been completed by subsequent close. Send error
- // back to client, only if not completed already.
- if (consumerFuture.completeExceptionally(exception)) {
- ctx.writeAndFlush(Commands.newError(requestId,
- BrokerServiceException.getClientErrorCode(exception.getCause()),
- exception.getCause().getMessage()));
- }
- consumers.remove(consumerId, consumerFuture);
+ }) //
+ .exceptionally(exception -> {
+ if (exception.getCause() instanceof ConsumerBusyException) {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "[{}][{}][{}] Failed to create consumer because exclusive consumer is already connected: {}",
+ remoteAddress, topicName, subscriptionName,
+ exception.getCause().getMessage());
+ }
+ } else {
+ log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
+ subscriptionName, exception.getCause().getMessage(), exception);
+ }
- return null;
+ // If client timed out, the future would have been completed by subsequent close.
+ // Send error
+ // back to client, only if not completed already.
+ if (consumerFuture.completeExceptionally(exception)) {
+ ctx.writeAndFlush(Commands.newError(requestId,
+ BrokerServiceException.getClientErrorCode(exception.getCause()),
+ exception.getCause().getMessage()));
+ }
+ consumers.remove(consumerId, consumerFuture);
- });
+ return null;
+
+ });
+ } else {
+ String msg = "Client is not authorized to subscribe";
+ log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+ ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
+ }
+ return null;
+ });
} else {
- String msg = "Client is not authorized to subscribe";
- log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+ final String msg = "Proxy Client is not authorized to subscribe";
+ log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
@@ -489,155 +566,171 @@ public class ServerCnx extends PulsarHandler {
@Override
protected void handleProducer(final CommandProducer cmdProducer) {
checkArgument(state == State.Connected);
- CompletableFuture<Boolean> authorizationFuture;
- if (service.isAuthorizationEnabled()) {
- authorizationFuture = service.getAuthorizationManager().canProduceAsync(
- DestinationName.get(cmdProducer.getTopic()),
- originalPrincipal != null ? originalPrincipal : authRole);
- } else {
- authorizationFuture = CompletableFuture.completedFuture(true);
- }
-
- // Use producer name provided by client if present
- final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
- : service.generateUniqueProducerName();
final String topicName = cmdProducer.getTopic();
final long producerId = cmdProducer.getProducerId();
final long requestId = cmdProducer.getRequestId();
- final boolean isEncrypted = cmdProducer.getEncrypted();
- final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
- authorizationFuture.thenApply(isAuthorized -> {
- if (isAuthorized) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole);
- }
-
- try {
- Metadata.validateMetadata(metadata);
- } catch (IllegalArgumentException iae) {
- final String msg = iae.getMessage();
- ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
- return null;
- }
-
- CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
- CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId, producerFuture);
-
- if (existingProducerFuture != null) {
- if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
- Producer producer = existingProducerFuture.getNow(null);
- log.info("[{}] Producer with the same id is already created: {}", remoteAddress, producer);
- ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName()));
- return null;
- } else {
- // There was an early request to create a producer with
- // same producerId. This can happen when
- // client
- // timeout is lower the broker timeouts. We need to wait
- // until the previous producer creation
- // request
- // either complete or fails.
- ServerError error = !existingProducerFuture.isDone() ? ServerError.ServiceNotReady
- : getErrorCode(existingProducerFuture);
- log.warn("[{}][{}] Producer is already present on the connection", remoteAddress, topicName);
- ctx.writeAndFlush(
- Commands.newError(requestId, error, "Producer is already present on the connection"));
- return null;
- }
+ CompletableFuture<Boolean> isProxyAuthorizedFuture;
+ if (service.isAuthorizationEnabled() && originalPrincipal != null) {
+ isProxyAuthorizedFuture = service.getAuthorizationManager().canProduceAsync(DestinationName.get(topicName),
+ authRole);
+ } else {
+ isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+ }
+ isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
+ if (isProxyAuthorized) {
+ CompletableFuture<Boolean> authorizationFuture;
+ if (service.isAuthorizationEnabled()) {
+ authorizationFuture = service.getAuthorizationManager().canProduceAsync(
+ DestinationName.get(cmdProducer.getTopic().toString()),
+ originalPrincipal != null ? originalPrincipal : authRole);
+ } else {
+ authorizationFuture = CompletableFuture.completedFuture(true);
}
+ // Use producer name provided by client if present
+ final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
+ : service.generateUniqueProducerName();
+ final boolean isEncrypted = cmdProducer.getEncrypted();
+ final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
+
+ authorizationFuture.thenApply(isAuthorized -> {
+ if (isAuthorized) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole);
+ }
+ CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
+ CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId,
+ producerFuture);
+
+ if (existingProducerFuture != null) {
+ if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
+ Producer producer = existingProducerFuture.getNow(null);
+ log.info("[{}] Producer with the same id is already created: {}", remoteAddress,
+ producer);
+ ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName()));
+ return null;
+ } else {
+ // There was an early request to create a producer with
+ // same producerId. This can happen when
+ // client
+ // timeout is lower the broker timeouts. We need to wait
+ // until the previous producer creation
+ // request
+ // either complete or fails.
+ ServerError error = !existingProducerFuture.isDone() ? ServerError.ServiceNotReady
+ : getErrorCode(existingProducerFuture);
+ log.warn("[{}][{}] Producer is already present on the connection", remoteAddress,
+ topicName);
+ ctx.writeAndFlush(Commands.newError(requestId, error,
+ "Producer is already present on the connection"));
+ return null;
+ }
+ }
- log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);
-
- service.getTopic(topicName).thenAccept((Topic topic) -> {
- // Before creating producer, check if backlog quota exceeded
- // on topic
- if (topic.isBacklogQuotaExceeded(producerName)) {
- IllegalStateException illegalStateException = new IllegalStateException(
- "Cannot create producer on topic with backlog quota exceeded");
- BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy();
- if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
- ctx.writeAndFlush(Commands.newError(requestId,
- ServerError.ProducerBlockedQuotaExceededError, illegalStateException.getMessage()));
- } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
- ctx.writeAndFlush(
- Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededException,
+ log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);
+
+ service.getTopic(topicName).thenAccept((Topic topic) -> {
+ // Before creating producer, check if backlog quota exceeded
+ // on topic
+ if (topic.isBacklogQuotaExceeded(producerName)) {
+ IllegalStateException illegalStateException = new IllegalStateException(
+ "Cannot create producer on topic with backlog quota exceeded");
+ BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy();
+ if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
+ ctx.writeAndFlush(
+ Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededError,
+ illegalStateException.getMessage()));
+ } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
+ ctx.writeAndFlush(Commands.newError(requestId,
+ ServerError.ProducerBlockedQuotaExceededException,
illegalStateException.getMessage()));
- }
- producerFuture.completeExceptionally(illegalStateException);
- producers.remove(producerId, producerFuture);
- return;
- }
+ }
+ producerFuture.completeExceptionally(illegalStateException);
+ producers.remove(producerId, producerFuture);
+ return;
+ }
- // Check whether the producer will publish encrypted messages or not
- if (topic.isEncryptionRequired() && !isEncrypted) {
- String msg = String.format("Encryption is required in %s", topicName);
- log.warn("[{}] {}", remoteAddress, msg);
- ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
- return;
- }
+ // Check whether the producer will publish encrypted messages or not
+ if (topic.isEncryptionRequired() && !isEncrypted) {
+ String msg = String.format("Encryption is required in %s", topicName);
+ log.warn("[{}] {}", remoteAddress, msg);
+ ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
+ return;
+ }
- disableTcpNoDelayIfNeeded(topicName, producerName);
+ disableTcpNoDelayIfNeeded(topicName, producerName);
- Producer producer =
- new Producer(topic, ServerCnx.this, producerId, producerName, authRole, isEncrypted, metadata);
+ Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
+ isEncrypted, metadata);
- try {
- topic.addProducer(producer);
+ try {
+ topic.addProducer(producer);
- if (isActive()) {
- if (producerFuture.complete(producer)) {
- log.info("[{}] Created new producer: {}", remoteAddress, producer);
- ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
- producer.getLastSequenceId()));
- return;
- } else {
- // The producer's future was completed before by
- // a close command
- producer.closeNow();
- log.info("[{}] Cleared producer created after timeout on client side {}", remoteAddress,
- producer);
+ if (isActive()) {
+ if (producerFuture.complete(producer)) {
+ log.info("[{}] Created new producer: {}", remoteAddress, producer);
+ ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
+ producer.getLastSequenceId()));
+ return;
+ } else {
+ // The producer's future was completed before by
+ // a close command
+ producer.closeNow();
+ log.info("[{}] Cleared producer created after timeout on client side {}",
+ remoteAddress, producer);
+ }
+ } else {
+ producer.closeNow();
+ log.info("[{}] Cleared producer created after connection was closed: {}",
+ remoteAddress, producer);
+ producerFuture.completeExceptionally(
+ new IllegalStateException("Producer created after connection was closed"));
+ }
+ } catch (BrokerServiceException ise) {
+ log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,
+ ise.getMessage());
+ ctx.writeAndFlush(Commands.newError(requestId,
+ BrokerServiceException.getClientErrorCode(ise), ise.getMessage()));
+ producerFuture.completeExceptionally(ise);
}
- } else {
- producer.closeNow();
- log.info("[{}] Cleared producer created after connection was closed: {}", remoteAddress,
- producer);
- producerFuture.completeExceptionally(
- new IllegalStateException("Producer created after connection was closed"));
- }
- } catch (BrokerServiceException ise) {
- log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,
- ise.getMessage());
- ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(ise),
- ise.getMessage()));
- producerFuture.completeExceptionally(ise);
- }
- producers.remove(producerId, producerFuture);
- }).exceptionally(exception -> {
- Throwable cause = exception.getCause();
- if (!(cause instanceof ServiceUnitNotReadyException)) {
- // Do not print stack traces for expected exceptions
- log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception);
- }
+ producers.remove(producerId, producerFuture);
+ }).exceptionally(exception -> {
+ Throwable cause = exception.getCause();
+ if (!(cause instanceof ServiceUnitNotReadyException)) {
+ // Do not print stack traces for expected exceptions
+ log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception);
+ }
- // If client timed out, the future would have been completed
- // by subsequent close. Send error back to
- // client, only if not completed already.
- if (producerFuture.completeExceptionally(exception)) {
- ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(cause),
- cause.getMessage()));
- }
- producers.remove(producerId, producerFuture);
+ // If client timed out, the future would have been completed
+ // by subsequent close. Send error back to
+ // client, only if not completed already.
+ if (producerFuture.completeExceptionally(exception)) {
+ ctx.writeAndFlush(Commands.newError(requestId,
+ BrokerServiceException.getClientErrorCode(cause), cause.getMessage()));
+ }
+ producers.remove(producerId, producerFuture);
+ return null;
+ });
+ } else {
+ String msg = "Client is not authorized to Produce";
+ log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+ ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
+ }
return null;
});
} else {
- String msg = "Client is not authorized to Produce";
- log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+ final String msg = "Proxy Client is not authorized to Produce";
+ log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
+ }).exceptionally(ex -> {
+ String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), authRole);
+ log.warn(msg);
+ ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, ex.getMessage()));
+ return null;
});
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index e7b670a..7d847ee 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -449,16 +449,7 @@ public class Commands {
}
public static ByteBuf newPartitionMetadataRequest(String topic, long requestId) {
- CommandPartitionedTopicMetadata.Builder partitionMetadataBuilder = CommandPartitionedTopicMetadata.newBuilder();
- partitionMetadataBuilder.setTopic(topic);
- partitionMetadataBuilder.setRequestId(requestId);
-
- CommandPartitionedTopicMetadata partitionMetadata = partitionMetadataBuilder.build();
- ByteBuf res = serializeWithSize(
- BaseCommand.newBuilder().setType(Type.PARTITIONED_METADATA).setPartitionMetadata(partitionMetadata));
- partitionMetadataBuilder.recycle();
- partitionMetadata.recycle();
- return res;
+ return Commands.newPartitionMetadataRequest(topic, requestId, null);
}
public static ByteBuf newPartitionMetadataResponse(int partitions, long requestId) {
@@ -477,16 +468,7 @@ public class Commands {
}
public static ByteBuf newLookup(String topic, boolean authoritative, long requestId) {
- CommandLookupTopic.Builder lookupTopicBuilder = CommandLookupTopic.newBuilder();
- lookupTopicBuilder.setTopic(topic);
- lookupTopicBuilder.setRequestId(requestId);
- lookupTopicBuilder.setAuthoritative(authoritative);
-
- CommandLookupTopic lookupBroker = lookupTopicBuilder.build();
- ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP).setLookupTopic(lookupBroker));
- lookupTopicBuilder.recycle();
- lookupBroker.recycle();
- return res;
+ return Commands.newLookup(topic, authoritative, null, requestId);
}
public static ByteBuf newLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boolean authoritative,
@@ -640,7 +622,7 @@ public class Commands {
}
static ByteBuf newPong() {
- return cmdPong.retainedDuplicate();
+ return cmdPong.retainedDuplicate();
}
private static ByteBuf serializeWithSize(BaseCommand.Builder cmdBuilder) {
@@ -891,4 +873,34 @@ public class Commands {
Crc32c,
None;
}
+
+ public static ByteBuf newPartitionMetadataRequest(String topic, long requestId, String clientAuthRole) {
+ CommandPartitionedTopicMetadata.Builder partitionMetadataBuilder = CommandPartitionedTopicMetadata.newBuilder();
+ partitionMetadataBuilder.setTopic(topic);
+ partitionMetadataBuilder.setRequestId(requestId);
+ if (clientAuthRole != null) {
+ partitionMetadataBuilder.setOriginalPrincipal(clientAuthRole);
+ }
+ CommandPartitionedTopicMetadata partitionMetadata = partitionMetadataBuilder.build();
+ ByteBuf res = serializeWithSize(
+ BaseCommand.newBuilder().setType(Type.PARTITIONED_METADATA).setPartitionMetadata(partitionMetadata));
+ partitionMetadataBuilder.recycle();
+ partitionMetadata.recycle();
+ return res;
+ }
+
+ public static ByteBuf newLookup(String topic, boolean authoritative, String clientAuthRole, long requestId) {
+ CommandLookupTopic.Builder lookupTopicBuilder = CommandLookupTopic.newBuilder();
+ lookupTopicBuilder.setTopic(topic);
+ lookupTopicBuilder.setRequestId(requestId);
+ lookupTopicBuilder.setAuthoritative(authoritative);
+ if (clientAuthRole != null) {
+ lookupTopicBuilder.setOriginalPrincipal(clientAuthRole);
+ }
+ CommandLookupTopic lookupBroker = lookupTopicBuilder.build();
+ ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP).setLookupTopic(lookupBroker));
+ lookupTopicBuilder.recycle();
+ lookupBroker.recycle();
+ return res;
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 6e344cb..eef3651 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -6779,6 +6779,10 @@ public final class PulsarApi {
// required uint64 request_id = 2;
boolean hasRequestId();
long getRequestId();
+
+ // optional string original_principal = 3;
+ boolean hasOriginalPrincipal();
+ String getOriginalPrincipal();
}
public static final class CommandPartitionedTopicMetadata extends
com.google.protobuf.GeneratedMessageLite
@@ -6859,9 +6863,42 @@ public final class PulsarApi {
return requestId_;
}
+ // optional string original_principal = 3;
+ public static final int ORIGINAL_PRINCIPAL_FIELD_NUMBER = 3;
+ private java.lang.Object originalPrincipal_;
+ public boolean hasOriginalPrincipal() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public String getOriginalPrincipal() {
+ java.lang.Object ref = originalPrincipal_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+ originalPrincipal_ = s;
+ }
+ return s;
+ }
+ }
+ private com.google.protobuf.ByteString getOriginalPrincipalBytes() {
+ java.lang.Object ref = originalPrincipal_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+ originalPrincipal_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
topic_ = "";
requestId_ = 0L;
+ originalPrincipal_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -6894,6 +6931,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeUInt64(2, requestId_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeBytes(3, getOriginalPrincipalBytes());
+ }
}
private int memoizedSerializedSize = -1;
@@ -6910,6 +6950,10 @@ public final class PulsarApi {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(2, requestId_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(3, getOriginalPrincipalBytes());
+ }
memoizedSerializedSize = size;
return size;
}
@@ -7027,6 +7071,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000001);
requestId_ = 0L;
bitField0_ = (bitField0_ & ~0x00000002);
+ originalPrincipal_ = "";
+ bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@@ -7068,6 +7114,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00000002;
}
result.requestId_ = requestId_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.originalPrincipal_ = originalPrincipal_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -7080,6 +7130,9 @@ public final class PulsarApi {
if (other.hasRequestId()) {
setRequestId(other.getRequestId());
}
+ if (other.hasOriginalPrincipal()) {
+ setOriginalPrincipal(other.getOriginalPrincipal());
+ }
return this;
}
@@ -7127,6 +7180,11 @@ public final class PulsarApi {
requestId_ = input.readUInt64();
break;
}
+ case 26: {
+ bitField0_ |= 0x00000004;
+ originalPrincipal_ = input.readBytes();
+ break;
+ }
}
}
}
@@ -7190,6 +7248,42 @@ public final class PulsarApi {
return this;
}
+ // optional string original_principal = 3;
+ private java.lang.Object originalPrincipal_ = "";
+ public boolean hasOriginalPrincipal() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public String getOriginalPrincipal() {
+ java.lang.Object ref = originalPrincipal_;
+ if (!(ref instanceof String)) {
+ String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+ originalPrincipal_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setOriginalPrincipal(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ originalPrincipal_ = value;
+
+ return this;
+ }
+ public Builder clearOriginalPrincipal() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ originalPrincipal_ = getDefaultInstance().getOriginalPrincipal();
+
+ return this;
+ }
+ void setOriginalPrincipal(com.google.protobuf.ByteString value) {
+ bitField0_ |= 0x00000004;
+ originalPrincipal_ = value;
+
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.CommandPartitionedTopicMetadata)
}
@@ -7865,6 +7959,10 @@ public final class PulsarApi {
// optional bool authoritative = 3 [default = false];
boolean hasAuthoritative();
boolean getAuthoritative();
+
+ // optional string original_principal = 4;
+ boolean hasOriginalPrincipal();
+ String getOriginalPrincipal();
}
public static final class CommandLookupTopic extends
com.google.protobuf.GeneratedMessageLite
@@ -7955,10 +8053,43 @@ public final class PulsarApi {
return authoritative_;
}
+ // optional string original_principal = 4;
+ public static final int ORIGINAL_PRINCIPAL_FIELD_NUMBER = 4;
+ private java.lang.Object originalPrincipal_;
+ public boolean hasOriginalPrincipal() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public String getOriginalPrincipal() {
+ java.lang.Object ref = originalPrincipal_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+ originalPrincipal_ = s;
+ }
+ return s;
+ }
+ }
+ private com.google.protobuf.ByteString getOriginalPrincipalBytes() {
+ java.lang.Object ref = originalPrincipal_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+ originalPrincipal_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
topic_ = "";
requestId_ = 0L;
authoritative_ = false;
+ originalPrincipal_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -7994,6 +8125,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBool(3, authoritative_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeBytes(4, getOriginalPrincipalBytes());
+ }
}
private int memoizedSerializedSize = -1;
@@ -8014,6 +8148,10 @@ public final class PulsarApi {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(3, authoritative_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(4, getOriginalPrincipalBytes());
+ }
memoizedSerializedSize = size;
return size;
}
@@ -8133,6 +8271,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000002);
authoritative_ = false;
bitField0_ = (bitField0_ & ~0x00000004);
+ originalPrincipal_ = "";
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@@ -8178,6 +8318,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00000004;
}
result.authoritative_ = authoritative_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.originalPrincipal_ = originalPrincipal_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -8193,6 +8337,9 @@ public final class PulsarApi {
if (other.hasAuthoritative()) {
setAuthoritative(other.getAuthoritative());
}
+ if (other.hasOriginalPrincipal()) {
+ setOriginalPrincipal(other.getOriginalPrincipal());
+ }
return this;
}
@@ -8245,6 +8392,11 @@ public final class PulsarApi {
authoritative_ = input.readBool();
break;
}
+ case 34: {
+ bitField0_ |= 0x00000008;
+ originalPrincipal_ = input.readBytes();
+ break;
+ }
}
}
}
@@ -8329,6 +8481,42 @@ public final class PulsarApi {
return this;
}
+ // optional string original_principal = 4;
+ private java.lang.Object originalPrincipal_ = "";
+ public boolean hasOriginalPrincipal() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public String getOriginalPrincipal() {
+ java.lang.Object ref = originalPrincipal_;
+ if (!(ref instanceof String)) {
+ String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+ originalPrincipal_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setOriginalPrincipal(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000008;
+ originalPrincipal_ = value;
+
+ return this;
+ }
+ public Builder clearOriginalPrincipal() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ originalPrincipal_ = getDefaultInstance().getOriginalPrincipal();
+
+ return this;
+ }
+ void setOriginalPrincipal(com.google.protobuf.ByteString value) {
+ bitField0_ |= 0x00000008;
+ originalPrincipal_ = value;
+
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.CommandLookupTopic)
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 6c86530..faa894f 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -72,9 +72,9 @@ message MessageMetadata {
// differentiate single and batch message metadata
optional int32 num_messages_in_batch = 11 [default = 1];
- // the timestamp that this event occurs. it is typically set by applications.
- // if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
- optional uint64 event_time = 12 [default = 0];
+ // the timestamp that this event occurs. it is typically set by applications.
+ // if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
+ optional uint64 event_time = 12 [default = 0];
// Contains encryption key name, encrypted key and metadata to describe the key
repeated EncryptionKeys encryption_keys = 13;
// Algorithm used to encrypt data key
@@ -186,6 +186,7 @@ message CommandSubscribe {
message CommandPartitionedTopicMetadata {
required string topic = 1;
required uint64 request_id = 2;
+ optional string original_principal = 3;
}
message CommandPartitionedTopicMetadataResponse {
@@ -198,13 +199,13 @@ message CommandPartitionedTopicMetadataResponse {
optional LookupType response = 3;
optional ServerError error = 4;
optional string message = 5;
-
}
message CommandLookupTopic {
required string topic = 1;
required uint64 request_id = 2;
optional bool authoritative = 3 [default = false];
+ optional string original_principal = 4;
}
message CommandLookupTopicResponse {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 9dc876f..5d35b86 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -33,7 +33,9 @@ import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.buffer.ByteBuf;
import io.prometheus.client.Counter;
+import static org.apache.commons.lang3.StringUtils.isBlank;
public class LookupProxyHandler {
private final ProxyService service;
@@ -41,6 +43,7 @@ public class LookupProxyHandler {
private final boolean connectWithTLS;
private SocketAddress clientAddress;
+ private String brokerServiceURL;
private static final Counter lookupRequests = Counter
.build("pulsar_proxy_lookup_requests", "Counter of topic lookup requests").create().register();
@@ -54,6 +57,8 @@ public class LookupProxyHandler {
this.proxyConnection = proxyConnection;
this.clientAddress = proxyConnection.clientAddress();
this.connectWithTLS = proxy.getConfiguration().isTlsEnabledWithBroker();
+ this.brokerServiceURL = this.connectWithTLS ? proxy.getConfiguration().getBrokerServiceURLTLS()
+ : proxy.getConfiguration().getBrokerServiceURL();
}
public void handleLookup(CommandLookupTopic lookup) {
@@ -64,20 +69,24 @@ public class LookupProxyHandler {
lookupRequests.inc();
long clientRequestId = lookup.getRequestId();
String topic = lookup.getTopic();
-
- ServiceLookupData availableBroker = null;
- try {
- availableBroker = service.getDiscoveryProvider().nextBroker();
- } catch (Exception e) {
- log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
- proxyConnection.ctx().writeAndFlush(
- Commands.newLookupErrorResponse(ServerError.ServiceNotReady, e.getMessage(), clientRequestId));
- return;
+ String serviceUrl;
+ if (isBlank(brokerServiceURL)) {
+ ServiceLookupData availableBroker = null;
+ try {
+ availableBroker = service.getDiscoveryProvider().nextBroker();
+ } catch (Exception e) {
+ log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newLookupErrorResponse(ServerError.ServiceNotReady, e.getMessage(), clientRequestId));
+ return;
+ }
+ serviceUrl = this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls()
+ : availableBroker.getPulsarServiceUrl();
+ } else {
+ serviceUrl = this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS()
+ : service.getConfiguration().getBrokerServiceURL();
}
-
- performLookup(clientRequestId, topic,
- this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl(),
- false, 10);
+ performLookup(clientRequestId, topic, serviceUrl, false, 10);
}
private void performLookup(long clientRequestId, String topic, String brokerServiceUrl, boolean authoritative,
@@ -99,31 +108,41 @@ public class LookupProxyHandler {
InetSocketAddress addr = InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
if (log.isDebugEnabled()) {
- log.debug("Getting connections to '{}'", addr);
+ log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr, topic,
+ clientRequestId);
}
service.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
// Connected to backend broker
long requestId = service.newRequestId();
- clientCnx.newLookup(Commands.newLookup(topic, authoritative, requestId), requestId).thenAccept(result -> {
- if (result.redirect) {
- // Need to try the lookup again on a different broker
- performLookup(clientRequestId, topic, result.brokerUrl, authoritative, numberOfRetries - 1);
- } else {
- // We have the result immediately
- String brokerUrl = connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
-
- // Reply the same address for both TLS non-TLS. The reason is that whether we use TLS between proxy
- // and broker is independent of whether the client itself uses TLS, but we need to force the client
- // to use the appropriate target broker (and port) when it will connect back.
- proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
- LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
- }
- }).exceptionally(ex -> {
- log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, ex.getMessage());
- proxyConnection.ctx().writeAndFlush(
- Commands.newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
- return null;
- });
+ ByteBuf command;
+ if (service.getConfiguration().isAuthenticationEnabled()) {
+ command = Commands.newLookup(topic, authoritative, proxyConnection.clientAuthRole, requestId);
+ } else {
+ command = Commands.newLookup(topic, authoritative, requestId);
+ }
+ clientCnx.newLookup(command,
+ requestId).thenAccept(result -> {
+ if (result.redirect) {
+ // Need to try the lookup again on a different broker
+ performLookup(clientRequestId, topic, result.brokerUrl, authoritative, numberOfRetries - 1);
+ } else {
+ // We have the result immediately
+ String brokerUrl = connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
+
+ // Reply the same address for both TLS non-TLS. The reason is that whether we use TLS
+ // between proxy
+ // and broker is independent of whether the client itself uses TLS, but we need to force the
+ // client
+ // to use the appropriate target broker (and port) when it will connect back.
+ proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
+ LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
+ }
+ }).exceptionally(ex -> {
+ log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, ex.getMessage());
+ proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+ ex.getMessage(), clientRequestId));
+ return null;
+ });
}).exceptionally(ex -> {
// Failed to connect to backend broker
proxyConnection.ctx().writeAndFlush(
@@ -132,30 +151,74 @@ public class LookupProxyHandler {
});
}
- void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata) {
+ public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata) {
partitionsMetadataRequests.inc();
if (log.isDebugEnabled()) {
log.debug("[{}] Received PartitionMetadataLookup", clientAddress);
}
- final long requestId = partitionMetadata.getRequestId();
+ final long clientRequestId = partitionMetadata.getRequestId();
DestinationName dn = DestinationName.get(partitionMetadata.getTopic());
-
- service.getDiscoveryProvider().getPartitionedTopicMetadata(service, dn, proxyConnection.clientAuthRole)
- .thenAccept(metadata -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Total number of partitions for topic {} is {}", proxyConnection.clientAuthRole,
- dn, metadata.partitions);
- }
- proxyConnection.ctx()
- .writeAndFlush(Commands.newPartitionMetadataResponse(metadata.partitions, requestId));
- }).exceptionally(ex -> {
- log.warn("[{}] Failed to get partitioned metadata for topic {} {}", clientAddress, dn,
- ex.getMessage(), ex);
- proxyConnection.ctx().writeAndFlush(Commands
- .newPartitionMetadataResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
- return null;
- });
+ if (isBlank(brokerServiceURL)) {
+ service.getDiscoveryProvider().getPartitionedTopicMetadata(service, dn, proxyConnection.clientAuthRole)
+ .thenAccept(metadata -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Total number of partitions for topic {} is {}",
+ proxyConnection.clientAuthRole, dn, metadata.partitions);
+ }
+ proxyConnection.ctx().writeAndFlush(
+ Commands.newPartitionMetadataResponse(metadata.partitions, clientRequestId));
+ }).exceptionally(ex -> {
+ log.warn("[{}] Failed to get partitioned metadata for topic {} {}", clientAddress, dn,
+ ex.getMessage(), ex);
+ proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(
+ ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
+ return null;
+ });
+ } else {
+ URI brokerURI;
+ try {
+ brokerURI = new URI(brokerServiceURL);
+ } catch (URISyntaxException e) {
+ proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
+ e.getMessage(), clientRequestId));
+ return;
+ }
+ InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort());
+
+ if (log.isDebugEnabled()) {
+ log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr,
+ dn.getPartitionedTopicName(), clientRequestId);
+ }
+
+ service.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
+ // Connected to backend broker
+ long requestId = service.newRequestId();
+ ByteBuf command;
+ if (service.getConfiguration().isAuthenticationEnabled()) {
+ command = Commands.newPartitionMetadataRequest(dn.toString(), requestId, proxyConnection.clientAuthRole);
+ } else {
+ command = Commands.newPartitionMetadataRequest(dn.toString(), requestId);
+ }
+ clientCnx.newLookup(
+ command,
+ requestId).thenAccept(lookupDataResult -> {
+ proxyConnection.ctx().writeAndFlush(Commands
+ .newPartitionMetadataResponse(lookupDataResult.partitions, clientRequestId));
+ }).exceptionally((ex) -> {
+ log.warn("[{}] failed to get Partitioned metadata : {}", dn.toString(),
+ ex.getCause().getMessage(), ex);
+ proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(
+ ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
+ return null;
+ });
+ }).exceptionally(ex -> {
+ // Failed to connect to backend broker
+ proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
+ ex.getMessage(), clientRequestId));
+ return null;
+ });
+ }
}
private static final Logger log = LoggerFactory.getLogger(LookupProxyHandler.class);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 0d0f160..1c45c4f 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -34,7 +34,11 @@ public class ProxyConfiguration implements PulsarConfiguration {
// ZooKeeper session timeout
private int zookeeperSessionTimeoutMs = 30_000;
-
+
+ // if Service Discovery is Disabled this url should point to the discovery service provider.
+ private String brokerServiceURL;
+ private String brokerServiceURLTLS;
+
// Port to use to server binary-proto request
private int servicePort = 6650;
// Port to use to server binary-proto-tls request
@@ -78,6 +82,22 @@ public class ProxyConfiguration implements PulsarConfiguration {
private Properties properties = new Properties();
+ public String getBrokerServiceURLTLS() {
+ return brokerServiceURLTLS;
+ }
+
+ public void setBrokerServiceURLTLS(String discoveryServiceURLTLS) {
+ this.brokerServiceURLTLS = discoveryServiceURLTLS;
+ }
+
+ public String getBrokerServiceURL() {
+ return brokerServiceURL;
+ }
+
+ public void setBrokerServiceURL(String discoveryServiceURL) {
+ this.brokerServiceURL = discoveryServiceURL;
+ }
+
public String getZookeeperServers() {
return zookeeperServers;
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index fed6e15..e5c2e91 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.StringUtils.isBlank;
import java.io.Closeable;
import java.io.IOException;
@@ -110,21 +111,23 @@ public class ProxyService implements Closeable {
}
public void start() throws Exception {
- localZooKeeperConnectionService = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
- proxyConfig.getZookeeperServers(), proxyConfig.getZookeeperSessionTimeoutMs());
- localZooKeeperConnectionService.start(new ShutdownService() {
- @Override
- public void shutdown(int exitCode) {
- LOG.error("Lost local ZK session. Shutting down the proxy");
- Runtime.getRuntime().halt(-1);
- }
- });
-
- discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory());
- this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache);
ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(proxyConfig);
authenticationService = new AuthenticationService(serviceConfiguration);
- authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService);
+
+ if (!isBlank(proxyConfig.getZookeeperServers()) && !isBlank(proxyConfig.getGlobalZookeeperServers())) {
+ localZooKeeperConnectionService = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
+ proxyConfig.getZookeeperServers(), proxyConfig.getZookeeperSessionTimeoutMs());
+ localZooKeeperConnectionService.start(new ShutdownService() {
+ @Override
+ public void shutdown(int exitCode) {
+ LOG.error("Lost local ZK session. Shutting down the proxy");
+ Runtime.getRuntime().halt(-1);
+ }
+ });
+ discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory());
+ this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache);
+ authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService);
+ }
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
@@ -145,7 +148,7 @@ public class ProxyService implements Closeable {
ServerBootstrap tlsBootstrap = bootstrap.clone();
tlsBootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, true));
tlsBootstrap.bind(proxyConfig.getServicePortTls()).sync();
- LOG.info("Started Pulsar TLS Proxy on port {}", proxyConfig.getWebServicePortTls());
+ LOG.info("Started Pulsar TLS Proxy on port {}", proxyConfig.getServicePortTls());
}
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index b7e9394..d9fc4af 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -88,8 +88,13 @@ public class ProxyServiceStarter {
config.setGlobalZookeeperServers(globalZookeeperServers);
}
- checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
- checkArgument(!isEmpty(config.getGlobalZookeeperServers()), "globalZookeeperServers must be provided");
+ if ((isBlank(config.getBrokerServiceURL()) && isBlank(config.getBrokerServiceURLTLS()))
+ || config.isAuthorizationEnabled()) {
+ checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
+ checkArgument(!isEmpty(config.getGlobalZookeeperServers()), "globalZookeeperServers must be provided");
+ }
+
+ java.security.Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
// create broker service
ProxyService discoveryService = new ProxyService(config);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 556a8b5..c62bbc1 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -61,7 +61,8 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
-
+ private final String DUMMY_VALUE = "DUMMY_VALUE";
+
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
private final String configClusterName = "use";
@@ -117,10 +118,12 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
proxyConfig.setBrokerClientAuthenticationParameters(
"tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
proxyConfig.setAuthenticationProviders(providers);
+
+ proxyConfig.setZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
proxyService = Mockito.spy(new ProxyService(proxyConfig));
doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
-
proxyService.start();
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 17696c7..05d79ef 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -40,6 +40,8 @@ import org.testng.annotations.Test;
public class ProxyTest extends MockedPulsarServiceBaseTest {
+ private final String DUMMY_VALUE = "DUMMY_VALUE";
+
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
@@ -49,6 +51,9 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
internalSetup();
proxyConfig.setServicePort(PortManager.nextFreePort());
+ proxyConfig.setZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+
proxyService = Mockito.spy(new ProxyService(proxyConfig));
doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 89426d5..a87ef09 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -43,7 +43,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest {
private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
-
+ private final String DUMMY_VALUE = "DUMMY_VALUE";
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
@@ -61,6 +61,9 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest {
proxyConfig.setTlsEnabledWithBroker(false);
proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
+ proxyConfig.setZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+
proxyService = Mockito.spy(new ProxyService(proxyConfig));
doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationNegTest.java
similarity index 57%
copy from pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
copy to pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationNegTest.java
index 556a8b5..04717ce 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationNegTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.proxy.server;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.util.HashSet;
@@ -37,8 +36,10 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.mockito.Mockito;
@@ -53,18 +54,24 @@ import org.testng.collections.Maps;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase {
- private static final Logger log = LoggerFactory.getLogger(ProxyAuthenticatedProducerConsumerTest.class);
-
- private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
- private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
- private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
- private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
- private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
-
+public class ProxyWithProxyAuthorizationNegTest extends ProducerConsumerBase {
+ private static final Logger log = LoggerFactory.getLogger(ProxyWithProxyAuthorizationNegTest.class);
+
+ private final String TLS_PROXY_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+ private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-cert.pem";
+ private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-key.pem";
+ private final String TLS_SERVER_CERT_TRUST_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+ private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-cert.pem";
+ private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-key.pem";
+ private final String TLS_CLIENT_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+ private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-cert.pem";
+ private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-key.pem";
+ private final String TLS_SUPERUSER_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+ private final String TLS_SUPERUSER_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+ private final String TLS_SUPERUSER_CLIENT_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
- private final String configClusterName = "use";
@BeforeMethod
@Override
@@ -75,32 +82,33 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
conf.setAuthorizationEnabled(true);
conf.setTlsEnabled(true);
- conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+ conf.setTlsTrustCertsFilePath(TLS_SERVER_CERT_TRUST_FILE_PATH);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(true);
Set<String> superUserRoles = new HashSet<>();
- superUserRoles.add("localhost");
superUserRoles.add("superUser");
conf.setSuperUserRoles(superUserRoles);
conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
conf.setBrokerClientAuthenticationParameters(
- "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
+ "tlsCertFile:" + TLS_SERVER_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName(configClusterName);
+ conf.setClusterName("proxy-authorization-neg");
super.init();
// start proxy service
proxyConfig.setAuthenticationEnabled(true);
- proxyConfig.setAuthenticationEnabled(true);
-
+ proxyConfig.setAuthorizationEnabled(false);
+ proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+ proxyConfig.setBrokerServiceURLTLS("pulsar://localhost:" + BROKER_PORT_TLS);
+
proxyConfig.setServicePort(PortManager.nextFreePort());
proxyConfig.setServicePortTls(PortManager.nextFreePort());
proxyConfig.setWebServicePort(PortManager.nextFreePort());
@@ -109,17 +117,16 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
proxyConfig.setTlsEnabledWithBroker(true);
// enable tls and auth&auth at proxy
- proxyConfig.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
- proxyConfig.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
- proxyConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+ proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
+ proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
+ proxyConfig.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH);
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(
- "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+ "tlsCertFile:" + TLS_PROXY_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_PROXY_KEY_FILE_PATH);
proxyConfig.setAuthenticationProviders(providers);
-
+
proxyService = Mockito.spy(new ProxyService(proxyConfig));
- doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
proxyService.start();
}
@@ -146,32 +153,46 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
* @throws Exception
*/
@Test
- public void testTlsSyncProducerAndConsumer() throws Exception {
+ public void testProxyAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);
+ createAdminClient();
final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
- Map<String, String> authParams = Maps.newHashMap();
- authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
- authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
- Authentication authTls = new AuthenticationTls();
- authTls.configure(authParams);
// create a client which connects to proxy over tls and pass authData
- PulsarClient proxyClient = createPulsarClient(authTls, proxyServiceUrl);
+ PulsarClient proxyClient = createPulsarClient(proxyServiceUrl);
- admin.clusters().updateCluster(configClusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
- "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
+ String namespaceName = "my-property/proxy-authorization-neg/my-ns";
+
admin.properties().createProperty("my-property",
- new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
- admin.namespaces().createNamespace("my-property/use/my-ns");
+ new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("proxy-authorization-neg")));
+ admin.namespaces().createNamespace(namespaceName);
+
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy", Sets.newHashSet(AuthAction.produce));
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client", Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
- Consumer consumer = proxyClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
- conf);
-
+ Consumer consumer;
+ try {
+ consumer = proxyClient.subscribe("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1", "my-subscriber-name",
+ conf);
+ } catch (Exception ex) {
+ // expected
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy", Sets.newHashSet(AuthAction.consume));
+ log.info("-- Admin permissions {} ---", admin.namespaces().getPermissions(namespaceName));
+ consumer = proxyClient.subscribe("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1", "my-subscriber-name",
+ conf);
+ }
ProducerConfiguration producerConf = new ProducerConfiguration();
-
- Producer producer = proxyClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
+ Producer producer;
+ try {
+ producer = proxyClient.createProducer("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1", producerConf);
+ } catch(Exception ex) {
+ // expected
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy", Sets.newHashSet(AuthAction.produce, AuthAction.consume));
+ producer = proxyClient.createProducer("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1", producerConf);
+ }
final int msgs = 10;
for (int i = 0; i < msgs; i++) {
String message = "my-message-" + i;
@@ -196,16 +217,34 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
log.info("-- Exiting {} test --", methodName);
}
- protected final PulsarClient createPulsarClient(Authentication auth, String lookupUrl) throws Exception {
+ protected final void createAdminClient() throws Exception {
+ Map<String, String> authParams = Maps.newHashMap();
+ authParams.put("tlsCertFile", TLS_SUPERUSER_CLIENT_CERT_FILE_PATH);
+ authParams.put("tlsKeyFile", TLS_SUPERUSER_CLIENT_KEY_FILE_PATH);
+ Authentication authTls = new AuthenticationTls();
+ authTls.configure(authParams);
org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
- clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+ clientConf.setTlsTrustCertsFilePath(TLS_SUPERUSER_CLIENT_TRUST_CERT_FILE_PATH);
clientConf.setTlsAllowInsecureConnection(true);
- clientConf.setAuthentication(auth);
+ clientConf.setAuthentication(authTls);
clientConf.setUseTls(true);
admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
- return PulsarClient.create(lookupUrl, clientConf);
}
-
+
+ private PulsarClient createPulsarClient(String proxyServiceUrl) throws PulsarClientException {
+ Map<String, String> authParams = Maps.newHashMap();
+ authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+ authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+ Authentication authTls = new AuthenticationTls();
+ authTls.configure(authParams);
+ org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
+ clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+ clientConf.setTlsTrustCertsFilePath(TLS_CLIENT_TRUST_CERT_FILE_PATH);
+ clientConf.setTlsAllowInsecureConnection(true);
+ clientConf.setAuthentication(authTls);
+ clientConf.setUseTls(true);
+ return PulsarClient.create(proxyServiceUrl, clientConf);
+ }
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
similarity index 63%
copy from pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
copy to pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
index 556a8b5..558f5e0 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.proxy.server;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -37,8 +38,10 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.mockito.Mockito;
@@ -53,18 +56,24 @@ import org.testng.collections.Maps;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase {
- private static final Logger log = LoggerFactory.getLogger(ProxyAuthenticatedProducerConsumerTest.class);
-
- private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
- private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
- private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
- private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
- private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
-
+public class ProxyWithProxyAuthorizationTest extends ProducerConsumerBase {
+ private static final Logger log = LoggerFactory.getLogger(ProxyWithProxyAuthorizationTest.class);
+
+ private final String TLS_PROXY_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+ private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-cert.pem";
+ private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-key.pem";
+ private final String TLS_SERVER_CERT_TRUST_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+ private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-cert.pem";
+ private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-key.pem";
+ private final String TLS_CLIENT_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+ private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-cert.pem";
+ private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-key.pem";
+ private final String TLS_SUPERUSER_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+ private final String TLS_SUPERUSER_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+ private final String TLS_SUPERUSER_CLIENT_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
- private final String configClusterName = "use";
@BeforeMethod
@Override
@@ -75,32 +84,33 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
conf.setAuthorizationEnabled(true);
conf.setTlsEnabled(true);
- conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+ conf.setTlsTrustCertsFilePath(TLS_SERVER_CERT_TRUST_FILE_PATH);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(true);
Set<String> superUserRoles = new HashSet<>();
- superUserRoles.add("localhost");
superUserRoles.add("superUser");
conf.setSuperUserRoles(superUserRoles);
conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
conf.setBrokerClientAuthenticationParameters(
- "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
+ "tlsCertFile:" + TLS_SERVER_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName(configClusterName);
+ conf.setClusterName("proxy-authorization");
super.init();
// start proxy service
proxyConfig.setAuthenticationEnabled(true);
- proxyConfig.setAuthenticationEnabled(true);
-
+ proxyConfig.setAuthorizationEnabled(false);
+ proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+ proxyConfig.setBrokerServiceURLTLS("pulsar://localhost:" + BROKER_PORT_TLS);
+
proxyConfig.setServicePort(PortManager.nextFreePort());
proxyConfig.setServicePortTls(PortManager.nextFreePort());
proxyConfig.setWebServicePort(PortManager.nextFreePort());
@@ -109,17 +119,16 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
proxyConfig.setTlsEnabledWithBroker(true);
// enable tls and auth&auth at proxy
- proxyConfig.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
- proxyConfig.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
- proxyConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+ proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
+ proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
+ proxyConfig.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH);
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(
- "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+ "tlsCertFile:" + TLS_PROXY_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_PROXY_KEY_FILE_PATH);
proxyConfig.setAuthenticationProviders(providers);
-
+
proxyService = Mockito.spy(new ProxyService(proxyConfig));
- doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
proxyService.start();
}
@@ -146,32 +155,32 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
* @throws Exception
*/
@Test
- public void testTlsSyncProducerAndConsumer() throws Exception {
+ public void textProxyAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);
+ createAdminClient();
final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
- Map<String, String> authParams = Maps.newHashMap();
- authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
- authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
- Authentication authTls = new AuthenticationTls();
- authTls.configure(authParams);
// create a client which connects to proxy over tls and pass authData
- PulsarClient proxyClient = createPulsarClient(authTls, proxyServiceUrl);
+ PulsarClient proxyClient = createPulsarClient(proxyServiceUrl);
- admin.clusters().updateCluster(configClusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
- "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
+ String namespaceName = "my-property/proxy-authorization/my-ns";
+
admin.properties().createProperty("my-property",
- new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
- admin.namespaces().createNamespace("my-property/use/my-ns");
+ new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
+ admin.namespaces().createNamespace(namespaceName);
+
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy", Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client", Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
- Consumer consumer = proxyClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
+ Consumer consumer = proxyClient.subscribe("persistent://my-property/proxy-authorization/my-ns/my-topic1", "my-subscriber-name",
conf);
ProducerConfiguration producerConf = new ProducerConfiguration();
- Producer producer = proxyClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
+ Producer producer = proxyClient.createProducer("persistent://my-property/proxy-authorization/my-ns/my-topic1", producerConf);
final int msgs = 10;
for (int i = 0; i < msgs; i++) {
String message = "my-message-" + i;
@@ -196,16 +205,34 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
log.info("-- Exiting {} test --", methodName);
}
- protected final PulsarClient createPulsarClient(Authentication auth, String lookupUrl) throws Exception {
+ protected final void createAdminClient() throws Exception {
+ Map<String, String> authParams = Maps.newHashMap();
+ authParams.put("tlsCertFile", TLS_SUPERUSER_CLIENT_CERT_FILE_PATH);
+ authParams.put("tlsKeyFile", TLS_SUPERUSER_CLIENT_KEY_FILE_PATH);
+ Authentication authTls = new AuthenticationTls();
+ authTls.configure(authParams);
org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
- clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+ clientConf.setTlsTrustCertsFilePath(TLS_SUPERUSER_CLIENT_TRUST_CERT_FILE_PATH);
clientConf.setTlsAllowInsecureConnection(true);
- clientConf.setAuthentication(auth);
+ clientConf.setAuthentication(authTls);
clientConf.setUseTls(true);
admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
- return PulsarClient.create(lookupUrl, clientConf);
}
-
+
+ private PulsarClient createPulsarClient(String proxyServiceUrl) throws PulsarClientException {
+ Map<String, String> authParams = Maps.newHashMap();
+ authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+ authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+ Authentication authTls = new AuthenticationTls();
+ authTls.configure(authParams);
+ org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
+ clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+ clientConf.setTlsTrustCertsFilePath(TLS_CLIENT_TRUST_CERT_FILE_PATH);
+ clientConf.setTlsAllowInsecureConnection(true);
+ clientConf.setAuthentication(authTls);
+ clientConf.setUseTls(true);
+ return PulsarClient.create(proxyServiceUrl, clientConf);
+ }
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
similarity index 89%
copy from pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
copy to pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index 556a8b5..6b47d2d 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.proxy.server;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.util.HashSet;
@@ -44,7 +43,6 @@ import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -53,8 +51,10 @@ import org.testng.collections.Maps;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase {
- private static final Logger log = LoggerFactory.getLogger(ProxyAuthenticatedProducerConsumerTest.class);
+import org.testng.Assert;
+
+public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase {
+ private static final Logger log = LoggerFactory.getLogger(ProxyWithoutServiceDiscoveryTest.class);
private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
@@ -64,7 +64,6 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
- private final String configClusterName = "use";
@BeforeMethod
@Override
@@ -72,7 +71,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
// enable tls and auth&auth at broker
conf.setAuthenticationEnabled(true);
- conf.setAuthorizationEnabled(true);
+ conf.setAuthorizationEnabled(false);
conf.setTlsEnabled(true);
conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
@@ -81,7 +80,6 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
conf.setTlsAllowInsecureConnection(true);
Set<String> superUserRoles = new HashSet<>();
- superUserRoles.add("localhost");
superUserRoles.add("superUser");
conf.setSuperUserRoles(superUserRoles);
@@ -93,14 +91,16 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
providers.add(AuthenticationProviderTls.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName(configClusterName);
+ conf.setClusterName("without-service-discovery");
super.init();
// start proxy service
proxyConfig.setAuthenticationEnabled(true);
- proxyConfig.setAuthenticationEnabled(true);
-
+ proxyConfig.setAuthorizationEnabled(false);
+ proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+ proxyConfig.setBrokerServiceURLTLS("pulsar://localhost:" + BROKER_PORT_TLS);
+
proxyConfig.setServicePort(PortManager.nextFreePort());
proxyConfig.setServicePortTls(PortManager.nextFreePort());
proxyConfig.setWebServicePort(PortManager.nextFreePort());
@@ -117,9 +117,8 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
proxyConfig.setBrokerClientAuthenticationParameters(
"tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
proxyConfig.setAuthenticationProviders(providers);
-
+
proxyService = Mockito.spy(new ProxyService(proxyConfig));
- doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
proxyService.start();
}
@@ -146,7 +145,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
* @throws Exception
*/
@Test
- public void testTlsSyncProducerAndConsumer() throws Exception {
+ public void testDiscoveryService() throws Exception {
log.info("-- Starting {} test --", methodName);
final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
@@ -158,20 +157,18 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
// create a client which connects to proxy over tls and pass authData
PulsarClient proxyClient = createPulsarClient(authTls, proxyServiceUrl);
- admin.clusters().updateCluster(configClusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
- "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
- new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
- admin.namespaces().createNamespace("my-property/use/my-ns");
+ new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("without-service-discovery")));
+ admin.namespaces().createNamespace("my-property/without-service-discovery/my-ns");
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
- Consumer consumer = proxyClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
+ Consumer consumer = proxyClient.subscribe("persistent://my-property/without-service-discovery/my-ns/my-topic1", "my-subscriber-name",
conf);
ProducerConfiguration producerConf = new ProducerConfiguration();
- Producer producer = proxyClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
+ Producer producer = proxyClient.createProducer("persistent://my-property/without-service-discovery/my-ns/my-topic1", producerConf);
final int msgs = 10;
for (int i = 0; i < msgs; i++) {
String message = "my-message-" + i;
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-cert.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-cert.pem
new file mode 100644
index 0000000..63fcf38
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-cert.pem
@@ -0,0 +1,72 @@
+Certificate:
+ Data:
+ Version: 3 (0x2)
+ Serial Number:
+ ac:a4:b3:6b:f5:b4:5f:c9
+ Signature Algorithm: sha1WithRSAEncryption
+ Issuer: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=New CA
+ Validity
+ Not Before: Dec 20 02:22:54 2017 GMT
+ Not After : Dec 20 02:22:54 2018 GMT
+ Subject: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=Broker
+ Subject Public Key Info:
+ Public Key Algorithm: rsaEncryption
+ RSA Public Key: (2048 bit)
+ Modulus (2048 bit):
+ 00:ba:ab:bd:1d:68:9e:1f:6d:99:8a:8e:95:8d:dc:
+ b7:e5:95:1a:40:ff:9e:5d:be:38:e6:19:1c:39:0d:
+ 39:e3:e0:cd:96:42:09:41:9f:ca:f1:7f:63:6f:be:
+ a5:46:1b:07:06:01:43:11:ed:e9:f9:a2:41:2a:29:
+ ac:10:d3:df:30:4a:f5:9b:5d:b9:97:2b:d4:10:82:
+ 92:55:e7:ca:b1:eb:94:6a:63:e6:28:a3:75:0e:f2:
+ 5b:ff:1a:df:0b:3e:2d:6b:c8:c1:49:98:2b:c1:5f:
+ 9a:c6:1d:94:26:7f:eb:6f:7e:81:c2:27:23:13:90:
+ 4f:89:04:dd:2c:8d:de:4c:f8:9f:33:b9:28:ed:7e:
+ 3a:14:fa:6f:d0:cc:50:5e:75:40:39:e2:57:46:af:
+ b7:67:8f:c9:57:f2:85:b0:54:59:02:76:c8:92:2c:
+ af:19:3e:09:d8:5f:a4:d0:9c:a7:35:77:c9:aa:90:
+ 50:86:2a:9a:3c:8f:3b:50:a5:01:88:b9:d3:eb:4d:
+ 23:24:f2:58:65:1c:03:7a:0a:2c:20:30:b6:46:8d:
+ b1:65:1c:16:0c:bf:bd:87:df:1c:e6:46:c8:f7:4f:
+ 60:fd:a1:91:c9:e4:ff:21:e7:e8:65:70:ba:9f:d6:
+ 44:07:27:45:1d:69:e7:d6:72:d8:d0:3e:df:2e:61:
+ 9e:4d
+ Exponent: 65537 (0x10001)
+ X509v3 extensions:
+ X509v3 Basic Constraints:
+ CA:FALSE
+ Netscape Comment:
+ OpenSSL Generated Certificate
+ X509v3 Subject Key Identifier:
+ 1C:C6:F7:DB:06:C1:1D:1C:7C:9E:64:AF:E5:47:47:80:00:6C:C8:26
+ X509v3 Authority Key Identifier:
+ keyid:E5:15:C2:1D:E7:EE:28:3C:FA:B6:3E:58:FB:0B:61:52:6E:B0:81:5B
+
+ Signature Algorithm: sha1WithRSAEncryption
+ 7f:b4:f8:d6:9c:ea:01:1b:74:19:a9:ee:ea:83:66:11:df:90:
+ c5:f0:e6:bc:05:bd:b4:8a:64:d6:08:fd:75:da:2e:f5:f9:20:
+ e0:62:8b:b8:b7:bd:c3:92:0f:a3:61:c7:78:6a:68:ea:74:20:
+ 8e:a8:b7:0f:28:d1:54:8a:55:af:38:8c:a7:64:79:1c:95:f6:
+ b8:f3:48:0e:14:2b:78:75:ff:96:70:85:28:30:1f:fa:94:a9:
+ 43:cd:98:6e:7b:80:68:bc:08:cc:35:1d:df:34:df:3d:58:52:
+ c3:5d:55:65:b6:be:ef:a2:78:a0:3c:41:c8:af:9f:74:e6:d8:
+ 0a:d3
+-----BEGIN CERTIFICATE-----
+MIIDKzCCApSgAwIBAgIJAKyks2v1tF/JMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
+BAYTAlVTMQswCQYDVQQIEwJDQTEPMA0GA1UEChMGQXBhY2hlMRkwFwYDVQQLExBB
+cGFjaGUgSW5jdWJhdG9yMQ8wDQYDVQQDEwZOZXcgQ0EwHhcNMTcxMjIwMDIyMjU0
+WhcNMTgxMjIwMDIyMjU0WjBXMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExDzAN
+BgNVBAoTBkFwYWNoZTEZMBcGA1UECxMQQXBhY2hlIEluY3ViYXRvcjEPMA0GA1UE
+AxMGQnJva2VyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuqu9HWie
+H22Zio6Vjdy35ZUaQP+eXb445hkcOQ054+DNlkIJQZ/K8X9jb76lRhsHBgFDEe3p
++aJBKimsENPfMEr1m125lyvUEIKSVefKseuUamPmKKN1DvJb/xrfCz4ta8jBSZgr
+wV+axh2UJn/rb36BwicjE5BPiQTdLI3eTPifM7ko7X46FPpv0MxQXnVAOeJXRq+3
+Z4/JV/KFsFRZAnbIkiyvGT4J2F+k0JynNXfJqpBQhiqaPI87UKUBiLnT600jJPJY
+ZRwDegosIDC2Ro2xZRwWDL+9h98c5kbI909g/aGRyeT/IefoZXC6n9ZEBydFHWnn
+1nLY0D7fLmGeTQIDAQABo3sweTAJBgNVHRMEAjAAMCwGCWCGSAGG+EIBDQQfFh1P
+cGVuU1NMIEdlbmVyYXRlZCBDZXJ0aWZpY2F0ZTAdBgNVHQ4EFgQUHMb32wbBHRx8
+nmSv5UdHgABsyCYwHwYDVR0jBBgwFoAU5RXCHefuKDz6tj5Y+wthUm6wgVswDQYJ
+KoZIhvcNAQEFBQADgYEAf7T41pzqARt0Ganu6oNmEd+QxfDmvAW9tIpk1gj9ddou
+9fkg4GKLuLe9w5IPo2HHeGpo6nQgjqi3DyjRVIpVrziMp2R5HJX2uPNIDhQreHX/
+lnCFKDAf+pSpQ82YbnuAaLwIzDUd3zTfPVhSw11VZba+76J4oDxByK+fdObYCtM=
+-----END CERTIFICATE-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-key.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-key.pem
new file mode 100644
index 0000000..8e47938
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-key.pem
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQC6q70daJ4fbZmK
+jpWN3LfllRpA/55dvjjmGRw5DTnj4M2WQglBn8rxf2NvvqVGGwcGAUMR7en5okEq
+KawQ098wSvWbXbmXK9QQgpJV58qx65RqY+Yoo3UO8lv/Gt8LPi1ryMFJmCvBX5rG
+HZQmf+tvfoHCJyMTkE+JBN0sjd5M+J8zuSjtfjoU+m/QzFBedUA54ldGr7dnj8lX
+8oWwVFkCdsiSLK8ZPgnYX6TQnKc1d8mqkFCGKpo8jztQpQGIudPrTSMk8lhlHAN6
+CiwgMLZGjbFlHBYMv72H3xzmRsj3T2D9oZHJ5P8h5+hlcLqf1kQHJ0UdaefWctjQ
+Pt8uYZ5NAgMBAAECggEBAIY3Tx1jCDYOppQiGtPKPAr9XsgXQrWiPOTsbwdyRApd
+q1P7HQ6rJs7mygcha1HxwuYFaETu7AkKKZJ4LfhXbiUZ8GgKRpOz9qD8UN0lcO7m
+NGsecvELPfJGPfE5T9+UkDHsQVV57RP3eqAxykC4Pv6GViPT4fuCCj25WpFbW9e4
+uuKFF3yVY3uJofPQGwLZ2b9WwujqgSyaozyKlTM4nPXwEEz56wPVuAsNfmTEtIb3
+N0d0uQpM69irH3sAO7nVDo6e/eP3Emq4kUDvhS04BafG+T7T9g0C74EGoJX5wrrk
+LzuEAkO84n6ESF6r+FI1XH4yskau3Jab8/x8f9sVj+ECgYEA9II7MZ2PSq2pHTsY
+1ZxZx3MKe/yiTMGkHhtQY6HKzzQXgEozK/uPTvMt7lKnBsseUydEXygMcgPXracF
+rFdiAQpD8Dq2jrmjtFcPk40DtLjdUUD4I2stTKprTfTrhx5X/JIX8iBflMTFWBYp
+ALM9qP0u3KZwVCGxEsGz5yaxtZkCgYEAw3Gj5eKw2pzRyNEdNsye3eQxp4QneM+X
+YozWzNrbGEdmJ1CHuMWXPTxAkxtMhH95QonySEP4R1fNxHJNMKPu7h2TiZiLvC/J
+UtE+SdETiEGF14SEfr/LflreTJnHCmK/pp19t1Q1cAn3FHws2D5qiA8eoBmnko6k
+irYydJn5dtUCgYBVOzRhJjg14vVJgDk29QqCsQJdmAIHWZTY/dJ2+IYW1mS+zp6p
+3UXmUnSXV+5rOtC2UcDOnso/0EEVglxC6C78h9SI4B6U//clvRdr6sL481wKn+gf
+iJPA3sMK6K5VamlnXJHGUCyhUjosa4Udfl2nE6KLPeV4Hkp4bFdG40EdOQKBgQCQ
+Y4dDUbt4dnyh0KO1lWwU3/4zFPYYUb00iHo0c8eDY1Q73Um3nvqBud63D2bzSD2s
+g78j1ls5Ucvpwsv2EFZ3QhB6ieFKET+52G4dGMJGWqnns7Yy8b0Dx1wN2Vnr+VI/
+ZIC5DRRBhossbiSvSUVo6Uql2u4q3wj+lWYnMI3VVQKBgQDs+sHMotTK976HKaRh
+sDepJnZwdnma1QBzsAXkZ0EJPqYCIFmbKGeXn/z2Fr62oGqe9suzuGLBYm4ukwoD
+xI8lDzxOoElFNaAHl6nIcFcj6I98idkU05NvV59aeLJngejJv3WmI2GH7jNK8dNs
+ELazMuTsmf+MdG/Q9C/kiHDvng==
+-----END PRIVATE KEY-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem
new file mode 100644
index 0000000..c77dd6c
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem
@@ -0,0 +1,62 @@
+Certificate:
+ Data:
+ Version: 3 (0x2)
+ Serial Number:
+ ac:a4:b3:6b:f5:b4:5f:c8
+ Signature Algorithm: sha1WithRSAEncryption
+ Issuer: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=New CA
+ Validity
+ Not Before: Dec 20 02:21:42 2017 GMT
+ Not After : Dec 19 02:21:42 2020 GMT
+ Subject: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=New CA
+ Subject Public Key Info:
+ Public Key Algorithm: rsaEncryption
+ RSA Public Key: (1024 bit)
+ Modulus (1024 bit):
+ 00:99:c1:1e:58:35:af:c1:38:38:45:8c:8c:f4:d9:
+ 6d:cc:ff:37:31:f9:ba:76:fa:fb:56:41:04:da:d2:
+ a1:ea:a8:ca:6d:3b:b2:bf:4c:e7:55:ab:1c:a1:7e:
+ d4:ec:54:d8:92:c6:f9:1f:e8:e8:d2:27:fa:4e:bb:
+ e6:b2:21:59:bd:19:63:9f:4b:a1:3d:c0:25:d3:70:
+ a4:9c:96:33:c6:53:c4:40:c1:de:a5:75:40:f7:db:
+ 51:f4:f6:19:9a:8d:a8:fa:0c:4b:fe:1f:11:70:23:
+ 31:76:c2:6c:41:6b:aa:c6:71:22:58:7b:4f:d8:2b:
+ 46:d6:e0:84:4d:57:e0:9c:09
+ Exponent: 65537 (0x10001)
+ X509v3 extensions:
+ X509v3 Subject Key Identifier:
+ E5:15:C2:1D:E7:EE:28:3C:FA:B6:3E:58:FB:0B:61:52:6E:B0:81:5B
+ X509v3 Authority Key Identifier:
+ keyid:E5:15:C2:1D:E7:EE:28:3C:FA:B6:3E:58:FB:0B:61:52:6E:B0:81:5B
+ DirName:/C=US/ST=CA/O=Apache/OU=Apache Incubator/CN=New CA
+ serial:AC:A4:B3:6B:F5:B4:5F:C8
+
+ X509v3 Basic Constraints:
+ CA:TRUE
+ Signature Algorithm: sha1WithRSAEncryption
+ 7c:15:8d:92:14:c2:cf:b6:72:17:ba:ba:e0:7c:48:a0:fb:02:
+ 86:b1:50:90:d0:b2:dd:40:9f:b5:e1:9e:ab:4a:bc:6c:f1:3e:
+ c3:7f:b5:b6:18:ab:f7:f0:a2:35:c6:5b:d7:2d:84:e1:d9:3d:
+ 8c:88:c2:1c:44:61:a8:14:ab:b1:00:b4:00:a5:2d:66:43:86:
+ 53:a2:d6:4a:73:96:b3:4f:63:b5:8d:8d:7f:e4:ff:82:37:81:
+ 63:00:0e:d1:ef:59:0c:7c:2b:79:24:97:06:60:cd:a1:b3:37:
+ 94:68:3d:6c:27:ee:8e:87:88:c1:21:0a:d5:04:66:11:06:11:
+ 69:92
+-----BEGIN CERTIFICATE-----
+MIIC6DCCAlGgAwIBAgIJAKyks2v1tF/IMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
+BAYTAlVTMQswCQYDVQQIEwJDQTEPMA0GA1UEChMGQXBhY2hlMRkwFwYDVQQLExBB
+cGFjaGUgSW5jdWJhdG9yMQ8wDQYDVQQDEwZOZXcgQ0EwHhcNMTcxMjIwMDIyMTQy
+WhcNMjAxMjE5MDIyMTQyWjBXMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExDzAN
+BgNVBAoTBkFwYWNoZTEZMBcGA1UECxMQQXBhY2hlIEluY3ViYXRvcjEPMA0GA1UE
+AxMGTmV3IENBMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCZwR5YNa/BODhF
+jIz02W3M/zcx+bp2+vtWQQTa0qHqqMptO7K/TOdVqxyhftTsVNiSxvkf6OjSJ/pO
+u+ayIVm9GWOfS6E9wCXTcKScljPGU8RAwd6ldUD321H09hmajaj6DEv+HxFwIzF2
+wmxBa6rGcSJYe0/YK0bW4IRNV+CcCQIDAQABo4G7MIG4MB0GA1UdDgQWBBTlFcId
+5+4oPPq2Plj7C2FSbrCBWzCBiAYDVR0jBIGAMH6AFOUVwh3n7ig8+rY+WPsLYVJu
+sIFboVukWTBXMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExDzANBgNVBAoTBkFw
+YWNoZTEZMBcGA1UECxMQQXBhY2hlIEluY3ViYXRvcjEPMA0GA1UEAxMGTmV3IENB
+ggkArKSza/W0X8gwDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQUFAAOBgQB8FY2S
+FMLPtnIXurrgfEig+wKGsVCQ0LLdQJ+14Z6rSrxs8T7Df7W2GKv38KI1xlvXLYTh
+2T2MiMIcRGGoFKuxALQApS1mQ4ZTotZKc5azT2O1jY1/5P+CN4FjAA7R71kMfCt5
+JJcGYM2hszeUaD1sJ+6Oh4jBIQrVBGYRBhFpkg==
+-----END CERTIFICATE-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-cert.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-cert.pem
new file mode 100644
index 0000000..741e10a
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-cert.pem
@@ -0,0 +1,72 @@
+Certificate:
+ Data:
+ Version: 3 (0x2)
+ Serial Number:
+ ac:a4:b3:6b:f5:b4:5f:ca
+ Signature Algorithm: sha1WithRSAEncryption
+ Issuer: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=New CA
+ Validity
+ Not Before: Dec 20 02:36:47 2017 GMT
+ Not After : Dec 20 02:36:47 2018 GMT
+ Subject: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=Client
+ Subject Public Key Info:
+ Public Key Algorithm: rsaEncryption
+ RSA Public Key: (2048 bit)
+ Modulus (2048 bit):
+ 00:fd:b6:bb:bc:a3:54:2b:06:b3:8e:68:31:e1:f3:
+ 3a:c6:3d:98:83:db:f8:fc:58:c6:35:47:4c:58:c1:
+ 40:81:71:8e:25:2c:6f:14:a0:5f:f2:85:97:fa:e5:
+ d1:a6:65:26:3f:4b:52:f1:4a:11:1b:f6:af:22:fb:
+ 24:74:d7:d3:bd:c3:11:dc:7f:1e:49:96:19:4a:f5:
+ 9c:b3:4c:85:5d:33:57:08:43:04:3d:b0:69:1a:15:
+ b3:08:c7:0d:68:09:02:09:37:90:1b:fa:51:e1:c9:
+ 6d:58:e3:d0:4e:e9:f9:a5:b5:4c:1a:5d:98:62:a2:
+ d6:cd:a2:89:dc:91:52:c7:f5:19:53:97:5f:58:86:
+ 6b:5e:48:6c:81:8d:2f:5c:0e:38:96:d2:b7:f7:47:
+ 21:2e:54:2a:51:32:92:0d:f3:c3:94:f5:59:98:2c:
+ 11:1a:88:ad:ee:16:5c:72:6b:31:e3:bf:ca:9e:38:
+ 4b:49:d2:87:e1:44:69:ef:ba:4d:b9:1d:4b:3f:e0:
+ c1:af:c5:04:6f:5f:2d:6e:d9:12:ac:bb:f1:f8:7f:
+ fc:bd:3a:6a:99:e6:45:f9:91:98:c9:d1:b1:f0:d5:
+ 6a:e1:fd:c0:6e:e2:8e:ab:0c:03:87:ad:9c:26:9a:
+ 8e:93:4c:82:ec:de:25:49:14:91:ce:80:9f:22:17:
+ aa:cf
+ Exponent: 65537 (0x10001)
+ X509v3 extensions:
+ X509v3 Basic Constraints:
+ CA:FALSE
+ Netscape Comment:
+ OpenSSL Generated Certificate
+ X509v3 Subject Key Identifier:
+ B2:8F:75:E3:D7:7A:4C:62:B8:5C:04:66:A0:56:14:16:AF:82:43:5A
+ X509v3 Authority Key Identifier:
+ keyid:E5:15:C2:1D:E7:EE:28:3C:FA:B6:3E:58:FB:0B:61:52:6E:B0:81:5B
+
+ Signature Algorithm: sha1WithRSAEncryption
+ 5f:e0:ec:f3:b4:bb:08:a6:15:85:f2:7d:c4:50:c4:87:e5:af:
+ 1a:38:11:98:b1:a1:d6:47:85:f6:c6:80:cc:b3:2b:f6:27:8e:
+ 24:1b:66:98:48:e7:d0:dd:cd:37:ea:a2:ad:cf:d8:a7:17:39:
+ 59:be:72:a1:2a:24:f5:d6:23:14:b9:42:b4:2f:b1:cd:15:98:
+ d9:1a:8a:55:3c:f2:78:be:b4:ba:6b:79:3d:29:e8:54:4b:d8:
+ 0f:1b:bd:69:ef:d2:ca:5c:0f:da:b4:b6:b8:cc:7f:b7:51:3c:
+ fc:3a:dd:6d:9c:3c:9e:71:ad:59:72:84:ac:01:6e:c5:66:8b:
+ b0:70
+-----BEGIN CERTIFICATE-----
+MIIDKzCCApSgAwIBAgIJAKyks2v1tF/KMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
+BAYTAlVTMQswCQYDVQQIEwJDQTEPMA0GA1UEChMGQXBhY2hlMRkwFwYDVQQLExBB
+cGFjaGUgSW5jdWJhdG9yMQ8wDQYDVQQDEwZOZXcgQ0EwHhcNMTcxMjIwMDIzNjQ3
+WhcNMTgxMjIwMDIzNjQ3WjBXMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExDzAN
+BgNVBAoTBkFwYWNoZTEZMBcGA1UECxMQQXBhY2hlIEluY3ViYXRvcjEPMA0GA1UE
+AxMGQ2xpZW50MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA/ba7vKNU
+Kwazjmgx4fM6xj2Yg9v4/FjGNUdMWMFAgXGOJSxvFKBf8oWX+uXRpmUmP0tS8UoR
+G/avIvskdNfTvcMR3H8eSZYZSvWcs0yFXTNXCEMEPbBpGhWzCMcNaAkCCTeQG/pR
+4cltWOPQTun5pbVMGl2YYqLWzaKJ3JFSx/UZU5dfWIZrXkhsgY0vXA44ltK390ch
+LlQqUTKSDfPDlPVZmCwRGoit7hZccmsx47/KnjhLSdKH4URp77pNuR1LP+DBr8UE
+b18tbtkSrLvx+H/8vTpqmeZF+ZGYydGx8NVq4f3AbuKOqwwDh62cJpqOk0yC7N4l
+SRSRzoCfIheqzwIDAQABo3sweTAJBgNVHRMEAjAAMCwGCWCGSAGG+EIBDQQfFh1P
+cGVuU1NMIEdlbmVyYXRlZCBDZXJ0aWZpY2F0ZTAdBgNVHQ4EFgQUso9149d6TGK4
+XARmoFYUFq+CQ1owHwYDVR0jBBgwFoAU5RXCHefuKDz6tj5Y+wthUm6wgVswDQYJ
+KoZIhvcNAQEFBQADgYEAX+Ds87S7CKYVhfJ9xFDEh+WvGjgRmLGh1keF9saAzLMr
+9ieOJBtmmEjn0N3NN+qirc/Ypxc5Wb5yoSok9dYjFLlCtC+xzRWY2RqKVTzyeL60
+umt5PSnoVEvYDxu9ae/SylwP2rS2uMx/t1E8/DrdbZw8nnGtWXKErAFuxWaLsHA=
+-----END CERTIFICATE-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-key.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-key.pem
new file mode 100644
index 0000000..81d00f9
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-key.pem
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQD9tru8o1QrBrOO
+aDHh8zrGPZiD2/j8WMY1R0xYwUCBcY4lLG8UoF/yhZf65dGmZSY/S1LxShEb9q8i
++yR019O9wxHcfx5JlhlK9ZyzTIVdM1cIQwQ9sGkaFbMIxw1oCQIJN5Ab+lHhyW1Y
+49BO6fmltUwaXZhiotbNoonckVLH9RlTl19YhmteSGyBjS9cDjiW0rf3RyEuVCpR
+MpIN88OU9VmYLBEaiK3uFlxyazHjv8qeOEtJ0ofhRGnvuk25HUs/4MGvxQRvXy1u
+2RKsu/H4f/y9OmqZ5kX5kZjJ0bHw1Wrh/cBu4o6rDAOHrZwmmo6TTILs3iVJFJHO
+gJ8iF6rPAgMBAAECggEAEJmkLvOAzk/h769hlCcV8WKWWApMgDZOwa2okSYT0mRb
+qJL/sZnMrVGQYBopXXnAxuNmyeLOu8WoL+G+wOZeNExPHt4yXR41CXKIjjKzhyWU
+zDWWUXL5bXt9+1UKy4PLXk8EXtBCC0Pio65EMuWcL/tsv0zga5O7+jhoTMY1ZF/D
+rsddf2mIncyEdhwAKLREmFv31lY1k+Jd+5eyXHIJEnK8lMXTcORNsb0YtlS5sRTU
+4llwQlBXjV06zIVRFxsRcPrgRYH0Hfg3hSIm3epNE+pbj0tcN0CfQFfrKJ9G2cDS
+jXimjvGsPKQ1PRMAcg93qZB3VtI+ag9bZt29cru0AQKBgQD/xzXZP5hKoqOy+8qH
+HyPvyM0QCpQ6KwHzgf5ATybPIPlyWQmT2eeR3ez4qskowNvc4Fc/q10Ao+q6jC3E
+721Wz6+iCb7Qus37KnEqVW7mWDLsDT5q7vIyRR22wWhrTpu0uZmxd9XxYRU6KUe1
+FMkI5VijJ27NoYtO+gLn9u6J6QKBgQD97xCNVaUNMNRZ1+HOKoBqcGBj91KrL74K
+/avYL0EprYwzN1lm4ZmNX8GaBeAftwnIDyxaM3Apw8BcqFFz/IslY/5sCyUmVjgI
+ZULkhCBy5ZamFNMxLvaN6njtdpgdBRxR9gzke1V/xxJgN7J39h9FI+pElwMW6314
+6AFHYQ/j9wKBgQCNwfjEOQzMgKs9bXNnxAiEwsN0GojgXCmureMd/UBDF8FocJRw
+Txqaq2bEwtLONWUlW2i/rtfSnQZg8YQEW7Y7oMt0gPYydPXoODOUBNl77HH8hbKM
+TXYKCmhXe4XFw0FkvmDCDOqT5vx+yZYmdCifN40Sj65HZTryQHoP2bmG0QKBgG/U
+ntd/hka+4GYIuvsOoKs/flPIEfIt/mXcvZdhiDMQqRPNJmQ2qmcmap6oQ8Hz3Czs
+8b1vtc/O06J6xhRsfeMjnGJ8rgmqItcfsUvuHFQ9ZBEUTsX0RsTNJCCAABGXtJcr
+4xWkc0zooOEa5lAKZk8OuBco4kVvxDxBAH8s8dCVAoGAeEZICuDGR8cOV64Eyx2X
+Ej1PQJrleMmzCwth7UhREGUgEVglhMeoBxmWCukYxpkVBY0DUy6OWH5lpTfCerFZ
+ho1AHMt9DsfUWo4hApMXEMyCZTOJwg9M4vQ1UTbFtr0mt0jnVWTUm3mVxmJnfrtz
+/DgLrvcJd7QCGAYICMNxrDs=
+-----END PRIVATE KEY-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-cert.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-cert.pem
new file mode 100644
index 0000000..8b524c8
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-cert.pem
@@ -0,0 +1,72 @@
+Certificate:
+ Data:
+ Version: 3 (0x2)
+ Serial Number:
+ ac:a4:b3:6b:f5:b4:5f:cb
+ Signature Algorithm: sha1WithRSAEncryption
+ Issuer: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=New CA
+ Validity
+ Not Before: Dec 20 02:45:24 2017 GMT
+ Not After : Dec 20 02:45:24 2018 GMT
+ Subject: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=Proxy
+ Subject Public Key Info:
+ Public Key Algorithm: rsaEncryption
+ RSA Public Key: (2048 bit)
+ Modulus (2048 bit):
+ 00:e1:e1:06:cc:f5:98:38:88:33:e0:f7:0a:5d:8e:
+ a8:89:ae:8f:cd:c7:77:62:17:c2:a1:d8:fc:fc:d0:
+ d0:86:f1:c8:3c:78:ec:b8:e9:73:1c:d1:72:55:97:
+ c6:47:5a:4c:33:18:32:a1:9c:e1:84:2e:de:40:2f:
+ a7:16:ed:a0:44:d6:4c:2c:04:ef:21:11:0b:6b:cb:
+ 36:8d:eb:5a:3d:a1:b6:9b:b5:23:be:bd:66:23:26:
+ c9:82:62:44:51:f8:3a:94:07:6c:52:84:2c:d0:d9:
+ 24:8b:0a:f5:1b:c8:31:a2:29:4c:bc:b7:bf:96:e1:
+ 56:78:d2:75:08:c9:cb:0d:1a:1d:93:2d:bf:bf:86:
+ 10:06:d7:5c:b8:e6:99:05:89:6f:ad:3b:a6:37:45:
+ 15:3a:63:8b:d1:d6:0d:e4:d0:c6:06:c6:63:13:21:
+ 92:65:c1:1a:ae:1a:72:97:cf:86:ed:6f:a1:77:d8:
+ 18:67:f2:27:36:1f:ff:40:6e:57:97:90:5a:28:04:
+ a4:a8:54:cf:a8:87:36:af:26:49:a6:4e:2d:d4:be:
+ e6:17:e2:1a:da:c4:08:87:fd:3f:fe:7b:d8:1e:f2:
+ 66:0f:34:1a:02:5d:39:ec:66:3d:46:bc:37:ce:84:
+ a2:51:0b:c8:72:f5:7c:5a:b8:1a:1b:0a:5d:2b:e9:
+ 56:4f
+ Exponent: 65537 (0x10001)
+ X509v3 extensions:
+ X509v3 Basic Constraints:
+ CA:FALSE
+ Netscape Comment:
+ OpenSSL Generated Certificate
+ X509v3 Subject Key Identifier:
+ 3F:A7:4A:6A:B1:6A:E1:51:8D:56:19:A2:2D:6A:A8:49:07:D6:87:8A
+ X509v3 Authority Key Identifier:
+ keyid:E5:15:C2:1D:E7:EE:28:3C:FA:B6:3E:58:FB:0B:61:52:6E:B0:81:5B
+
+ Signature Algorithm: sha1WithRSAEncryption
+ 98:89:57:fd:96:0e:78:06:ce:9f:83:48:28:c9:34:a4:32:93:
+ d2:65:fb:2f:a9:39:51:ff:7a:89:57:26:6a:59:0d:81:09:20:
+ 75:ae:c6:aa:f6:8c:d4:d2:7f:f0:78:88:df:74:90:28:11:15:
+ 77:d3:60:3d:2d:d2:ef:34:1b:03:59:9f:23:1c:21:64:e5:b8:
+ a1:99:c3:08:82:31:3d:58:01:23:52:b8:96:c8:d5:42:b3:3b:
+ 50:43:cc:7d:43:08:1d:c4:46:06:7f:c3:7f:3e:6d:01:f2:25:
+ 91:4b:70:fd:0f:e3:25:a6:d4:d8:c9:f6:35:65:00:87:c7:03:
+ c2:d7
+-----BEGIN CERTIFICATE-----
+MIIDKjCCApOgAwIBAgIJAKyks2v1tF/LMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
+BAYTAlVTMQswCQYDVQQIEwJDQTEPMA0GA1UEChMGQXBhY2hlMRkwFwYDVQQLExBB
+cGFjaGUgSW5jdWJhdG9yMQ8wDQYDVQQDEwZOZXcgQ0EwHhcNMTcxMjIwMDI0NTI0
+WhcNMTgxMjIwMDI0NTI0WjBWMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExDzAN
+BgNVBAoTBkFwYWNoZTEZMBcGA1UECxMQQXBhY2hlIEluY3ViYXRvcjEOMAwGA1UE
+AxMFUHJveHkwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDh4QbM9Zg4
+iDPg9wpdjqiJro/Nx3diF8Kh2Pz80NCG8cg8eOy46XMc0XJVl8ZHWkwzGDKhnOGE
+Lt5AL6cW7aBE1kwsBO8hEQtryzaN61o9obabtSO+vWYjJsmCYkRR+DqUB2xShCzQ
+2SSLCvUbyDGiKUy8t7+W4VZ40nUIycsNGh2TLb+/hhAG11y45pkFiW+tO6Y3RRU6
+Y4vR1g3k0MYGxmMTIZJlwRquGnKXz4btb6F32Bhn8ic2H/9AbleXkFooBKSoVM+o
+hzavJkmmTi3UvuYX4hraxAiH/T/+e9ge8mYPNBoCXTnsZj1GvDfOhKJRC8hy9Xxa
+uBobCl0r6VZPAgMBAAGjezB5MAkGA1UdEwQCMAAwLAYJYIZIAYb4QgENBB8WHU9w
+ZW5TU0wgR2VuZXJhdGVkIENlcnRpZmljYXRlMB0GA1UdDgQWBBQ/p0pqsWrhUY1W
+GaItaqhJB9aHijAfBgNVHSMEGDAWgBTlFcId5+4oPPq2Plj7C2FSbrCBWzANBgkq
+hkiG9w0BAQUFAAOBgQCYiVf9lg54Bs6fg0goyTSkMpPSZfsvqTlR/3qJVyZqWQ2B
+CSB1rsaq9ozU0n/weIjfdJAoERV302A9LdLvNBsDWZ8jHCFk5bihmcMIgjE9WAEj
+UriWyNVCsztQQ8x9QwgdxEYGf8N/Pm0B8iWRS3D9D+MlptTYyfY1ZQCHxwPC1w==
+-----END CERTIFICATE-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-key.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-key.pem
new file mode 100644
index 0000000..9856807
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-key.pem
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDh4QbM9Zg4iDPg
+9wpdjqiJro/Nx3diF8Kh2Pz80NCG8cg8eOy46XMc0XJVl8ZHWkwzGDKhnOGELt5A
+L6cW7aBE1kwsBO8hEQtryzaN61o9obabtSO+vWYjJsmCYkRR+DqUB2xShCzQ2SSL
+CvUbyDGiKUy8t7+W4VZ40nUIycsNGh2TLb+/hhAG11y45pkFiW+tO6Y3RRU6Y4vR
+1g3k0MYGxmMTIZJlwRquGnKXz4btb6F32Bhn8ic2H/9AbleXkFooBKSoVM+ohzav
+JkmmTi3UvuYX4hraxAiH/T/+e9ge8mYPNBoCXTnsZj1GvDfOhKJRC8hy9XxauBob
+Cl0r6VZPAgMBAAECggEBAIXa6UHKhKNzq3K0UxMwOBYnORbUDp41wGRTB1D2maxu
+WZ/kdTv7M/ku8VdhsuGT1DYvL8nwAwBnGdPlqVoABYrlh4xKfD8XL7J4YWLmxrph
+O6q4RG+DI6TPFnlKrHv64xPX9kxMAZbeJzayjqAhGbCkUtI+/a126dx9s1c65jZj
+VyEDrfogOi3CUVHnTxZ3Yayy0gqldPAYdtt9p5YYyTxJYmuKqHBTh7FToX3RhyT0
+pZ4+IE7YV4HiBev2K8K6c4E2/UOZtkENCLy7DAQuQgokHYk0YeoG+tYfnBcIFkVD
+169Z766il027ILS8F7HMoBPQVYdf24YUgfQC3k8h8HECgYEA/VGEr3vFwxCUHtOK
+SKXCpFWpK0KvcYBQvgzLuKkTNbTWnezUwAugq+Ybao/hqsF5jEd9U8Iv35myHI8j
+EHHF9J8/zb1EcIZgTAPO4Uvc2rYxwt/c0kwy7F/FovVKg5yEscJ35iXQWFO5Yxyu
+rYU8yNVPBqXGCeUS1jJbryg1JZcCgYEA5EUmDfPHp6gWx9MmeuDqxvb2L/WHyxGb
+ojSsV5GFlCLa3QMKc1H/1+6lxLbMiGvtk2S1B9YeGWAvRB+10GSgn7AhiObxv20C
+8oqRtLPxO/eCCGOBnUiGTqKibFNyTVJ/+FgWpywQSUY8tk58fPBZvydE6XV0Wa6T
+1INerLxVnAkCgYAxkXn9PKL+AIh7X7l3bbggoAJyTKI3+3vRNH/IqozvvWshi+41
+hhDykhxbRbxKxYEbSgHkGeN0RYbsv7WEyj6KF39MqvRxcFn3hec9frLAuVYTY+q5
+2987EaKCuKzUBBSTFBKSHmQeZIOqOTqVCbVTNyo3isittv1wnHoEVEHSEQKBgQCM
+oQkjuVb8M/Ls4mmndB9Pul/LBhHFijB+isLOJAnOTHbXiAMNLqxWpGCdwxxYw10W
+3AknLcNXUMltx7dkDkpidskCJX0zuH4DXFkNoXnxvrbuYhc9Bawwj8NOx0340uWh
+4ur5zIywB8RpcAsDkbNIr3Gl/kVS5tmOJ+zQsCpxuQKBgQCKV6CDtKgGLgWvERUE
+Dei9pUx2uXtvThZomqoZqr+hZE3YmvtHZcLMK8sXJWDdkYVQ4bwDkmrSSkk5F9Nh
+PClfyOObFbOXLD0TrJZSJd/zrnmnWk8u4eE5XSwAQ+0XiO4LgQHDOutXpvW9ZVvT
+om8NGk5mEUz39XN0tuWzcN2FIQ==
+-----END PRIVATE KEY-----
--
To stop receiving notification emails like this one, please contact
jai1@apache.org.