You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/08 02:14:30 UTC

[GitHub] jai1 closed pull request #1169: Proxy forward auth data

jai1 closed pull request #1169: Proxy forward auth data
URL: https://github.com/apache/incubator-pulsar/pull/1169
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index 780b192d0..609cddd2c 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -169,6 +169,13 @@ enableRunBookieTogether=false
 enableRunBookieAutoRecoveryTogether=false
 
 ### --- Authentication --- ###
+# Role names that are treated as "proxy roles". If the broker sees a request with
+#role as proxyRoles - it will demand to see a valid original principal.
+proxyRoles=
+
+# If this flag is set then the broker authenticates the original Auth data
+# else it just accepts the originalPrincipal and authorizes it (if required).  
+authenticateOriginalAuthData=false
 
 # Enable TLS
 tlsEnabled=false
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 2ecc9b139..dafcfc401 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -57,6 +57,10 @@ brokerClientAuthenticationParameters=
 # operations and publish/consume from all topics (comma-separated)
 superUserRoles=
 
+# Forward client authorization Credentials to Broker for re authorization
+# make sure authentication is enabled for this to take effect
+forwardAuthorizationCredentials=false
+
 ##### --- TLS --- #####
 
 # Enable TLS in the proxy
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 52b31748c..c75dd745b 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -152,6 +152,13 @@ enablePersistentTopics=true
 enableNonPersistentTopics=true
 
 ### --- Authentication --- ###
+# Role names that are treated as "proxy roles". If the broker sees a request with
+#role as proxyRoles - it will demand to see a valid original principal.
+proxyRoles=
+
+# If this flag is set then the broker authenticates the original Auth data
+# else it just accepts the originalPrincipal and authorizes it (if required). 
+authenticateOriginalAuthData=false
 
 # Enable authentication
 authenticationEnabled=false
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0a3c39972..de27fd9db 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -197,6 +197,10 @@
     // role as proxyRoles - it will demand to see the original client role or certificate.
     private Set<String> proxyRoles = Sets.newTreeSet();
 
+    // If this flag is set then the broker authenticates the original Auth data
+    // else it just accepts the originalPrincipal and authorizes it (if required). 
+    private boolean authenticateOriginalAuthData = false;
+
     // Allow wildcard matching in authorization
     // (wildcard matching only applicable if wildcard-char:
     // * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
@@ -1377,4 +1381,12 @@ public boolean exposeTopicLevelMetricsInPrometheus() {
     public void setExposeTopicLevelMetricsInPrometheus(boolean exposeTopicLevelMetricsInPrometheus) {
         this.exposeTopicLevelMetricsInPrometheus = exposeTopicLevelMetricsInPrometheus;
     }
+    
+    public boolean authenticateOriginalAuthData() {
+        return authenticateOriginalAuthData;
+    }
+
+    public void setAuthenticateOriginalAuthData(boolean authenticateOriginalAuthData) {
+        this.authenticateOriginalAuthData = authenticateOriginalAuthData;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index efa3ed916..05fb749c3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -106,8 +106,9 @@
     private int nonPersistentPendingMessages = 0;
     private final int MaxNonPersistentPendingMessages;
     private String originalPrincipal = null;
-    private Set<String> proxyRoles = Sets.newHashSet();
-
+    private Set<String> proxyRoles;
+    private boolean authenticateOriginalAuthData;
+    
     enum State {
         Start, Connected, Failed
     }
@@ -124,6 +125,7 @@ public ServerCnx(BrokerService service) {
         this.MaxNonPersistentPendingMessages = service.pulsar().getConfiguration()
                 .getMaxConcurrentNonPersistentMessagePerConnection();
         this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles();
+        this.authenticateOriginalAuthData = service.pulsar().getConfiguration().authenticateOriginalAuthData();
     }
 
     @Override
@@ -214,11 +216,24 @@ protected void handleLookup(CommandLookupTopic lookup) {
         if (topicName == null) {
             return;
         }
-
+        
+        String originalPrincipal = null;
+        if (authenticateOriginalAuthData && lookup.hasOriginalAuthData()) {
+            originalPrincipal = validateOriginalPrincipal(
+                    lookup.hasOriginalAuthData() ? lookup.getOriginalAuthData() : null,
+                    lookup.hasOriginalAuthMethod() ? lookup.getOriginalAuthMethod() : null,
+                    lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.originalPrincipal, requestId,
+                    lookup);
+
+            if (originalPrincipal == null) {
+                return;
+            }
+        } else {
+            originalPrincipal = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.originalPrincipal;
+        }
+        
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
-        if (lookupSemaphore.tryAcquire()) {
-            final String originalPrincipal = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal()
-                    : this.originalPrincipal;
+        if (lookupSemaphore.tryAcquire()) {            
             if (invalidOriginalPrincipal(originalPrincipal)) {
                 final String msg = "Valid Proxy Client role should be provided for lookup ";
                 log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
@@ -234,11 +249,11 @@ protected void handleLookup(CommandLookupTopic lookup) {
             } else {
                 isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
             }
-
+            String finalOriginalPrincipal = originalPrincipal;
             isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
                 if (isProxyAuthorized) {
                     lookupDestinationAsync(getBrokerService().pulsar(), topicName,
-                            lookup.getAuthoritative(), originalPrincipal != null ? originalPrincipal : authRole,
+                            lookup.getAuthoritative(), finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole,
                             lookup.getRequestId()).handle((lookupResponse, ex) -> {
                                 if (ex == null) {
                                     ctx.writeAndFlush(lookupResponse);
@@ -287,11 +302,24 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
         if (topicName == null) {
             return;
         }
-
+        String originalPrincipal = null;
+        if (authenticateOriginalAuthData && partitionMetadata.hasOriginalAuthData()) {
+            originalPrincipal = validateOriginalPrincipal(
+                    partitionMetadata.hasOriginalAuthData() ? partitionMetadata.getOriginalAuthData() : null,
+                    partitionMetadata.hasOriginalAuthMethod() ? partitionMetadata.getOriginalAuthMethod() : null,
+                    partitionMetadata.hasOriginalPrincipal() ? partitionMetadata.getOriginalPrincipal()
+                            : this.originalPrincipal,
+                    requestId, partitionMetadata);
+
+            if (originalPrincipal == null) {
+                return;
+            }
+        } else {
+            originalPrincipal = partitionMetadata.hasOriginalPrincipal() ? partitionMetadata.getOriginalPrincipal() : this.originalPrincipal;
+        }
+        
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
-            final String originalPrincipal = partitionMetadata.hasOriginalPrincipal()
-                    ? partitionMetadata.getOriginalPrincipal() : this.originalPrincipal;
             if (invalidOriginalPrincipal(originalPrincipal)) {
                 final String msg = "Valid Proxy Client role should be provided for getPartitionMetadataRequest ";
                 log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
@@ -308,10 +336,11 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
             } else {
                 isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
             }
+            String finalOriginalPrincipal = originalPrincipal;
             isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
                     if (isProxyAuthorized) {
                     getPartitionedTopicMetadata(getBrokerService().pulsar(),
-                            originalPrincipal != null ? originalPrincipal : authRole, topicName)
+                            finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, topicName)
                                     .handle((metadata, ex) -> {
                                     if (ex == null) {
                                         int partitions = metadata.partitions;
@@ -410,6 +439,39 @@ protected void handleConsumerStats(CommandConsumerStats commandConsumerStats) {
 
         return commandConsumerStatsResponseBuilder;
     }
+    
+    private String validateOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal, Long requestId, GeneratedMessageLite request) {
+        ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
+        SSLSession sslSession = null;
+        if (sslHandler != null) {
+            sslSession = ((SslHandler) sslHandler).engine().getSession();
+        }
+        try {
+            return getOriginalPrincipal(originalAuthData, originalAuthMethod, originalPrincipal, sslSession);
+        } catch (AuthenticationException e) {
+            String msg = "Unable to authenticate original authdata ";
+            log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage());
+            if (request instanceof CommandLookupTopic) {
+                ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthenticationError, msg, requestId));
+            } else if (request instanceof CommandPartitionedTopicMetadata) {
+                ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthenticationError, msg, requestId));
+            }
+            return null;
+        }
+    }
+    
+    private String getOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal,
+            SSLSession sslSession) throws AuthenticationException {
+        if (authenticateOriginalAuthData) {
+            if (originalAuthData != null) {
+                originalPrincipal = getBrokerService().getAuthenticationService().authenticate(
+                        new AuthenticationDataCommand(originalAuthData, remoteAddress, sslSession), originalAuthMethod);
+            } else {
+                originalPrincipal = null;
+            }
+        }
+        return originalPrincipal;
+    }
 
     @Override
     protected void handleConnect(CommandConnect connect) {
@@ -430,8 +492,11 @@ protected void handleConnect(CommandConnect connect) {
                 if (sslHandler != null) {
                     sslSession = ((SslHandler) sslHandler).engine().getSession();
                 }
-
-                originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null;
+                originalPrincipal = getOriginalPrincipal(
+                        connect.hasOriginalAuthData() ? connect.getOriginalAuthData() : null,
+                        connect.hasOriginalAuthMethod() ? connect.getOriginalAuthMethod() : null,
+                        connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null,
+                        sslSession);
                 authRole = getBrokerService().getAuthenticationService()
                         .authenticate(new AuthenticationDataCommand(authData, remoteAddress, sslSession), authMethod);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 4b895d463..bc0e56180 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -271,7 +271,7 @@ public void testKeepAliveNotEnforcedWithOlderClients() throws Exception {
         assertEquals(serverCnx.getState(), State.Start);
 
         // test server response to CONNECT
-        ByteBuf clientCommand = Commands.newConnect("none", "", ProtocolVersion.v0.getNumber(), null, null, null);
+        ByteBuf clientCommand = Commands.newConnect("none", "", ProtocolVersion.v0.getNumber(), null, null, null, null, null);
         channel.writeInbound(clientCommand);
 
         assertEquals(serverCnx.getState(), State.Connected);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 4fc324105..4376b013f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -79,19 +79,23 @@
     private static final int checksumSize = 4;
 
     public static ByteBuf newConnect(String authMethodName, String authData, String libVersion) {
-        return newConnect(authMethodName, authData, getCurrentProtocolVersion(), libVersion, null /* target broker */, null /* originalPrincipal */);
+        return newConnect(authMethodName, authData, getCurrentProtocolVersion(), libVersion, null /* target broker */,
+                null /* originalPrincipal */, null /* Client Auth Data */, null /* Client Auth Method */);
     }
 
     public static ByteBuf newConnect(String authMethodName, String authData, String libVersion, String targetBroker) {
-        return newConnect(authMethodName, authData, getCurrentProtocolVersion(), libVersion, targetBroker, null);
+        return newConnect(authMethodName, authData, getCurrentProtocolVersion(), libVersion, targetBroker, null, null, null);
     }
 
-    public static ByteBuf newConnect(String authMethodName, String authData, String libVersion, String targetBroker, String originalPrincipal) {
-        return newConnect(authMethodName, authData, getCurrentProtocolVersion(), libVersion, targetBroker, originalPrincipal);
+    public static ByteBuf newConnect(String authMethodName, String authData, String libVersion, String targetBroker,
+            String originalPrincipal, String clientAuthData, String clientAuthMethod) {
+        return newConnect(authMethodName, authData, getCurrentProtocolVersion(), libVersion, targetBroker,
+                originalPrincipal, clientAuthData, clientAuthMethod);
     }
 
     public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion, String libVersion,
-            String targetBroker, String originalPrincipal) {
+            String targetBroker, String originalPrincipal, String originalAuthData,
+            String originalAuthMethod) {
         CommandConnect.Builder connectBuilder = CommandConnect.newBuilder();
         connectBuilder.setClientVersion(libVersion != null ? libVersion : "Pulsar Client");
         connectBuilder.setAuthMethodName(authMethodName);
@@ -116,6 +120,13 @@ public static ByteBuf newConnect(String authMethodName, String authData, int pro
             connectBuilder.setOriginalPrincipal(originalPrincipal);
         }
 
+        if (originalAuthData != null) {
+            connectBuilder.setOriginalAuthData(originalAuthData);
+        }
+        
+        if (originalAuthMethod != null) {
+            connectBuilder.setOriginalAuthMethod(originalAuthMethod);
+        }
         connectBuilder.setProtocolVersion(protocolVersion);
         CommandConnect connect = connectBuilder.build();
         ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.CONNECT).setConnect(connect));
@@ -450,7 +461,7 @@ public static ByteBuf newPartitionMetadataResponse(ServerError error, String err
     }
 
     public static ByteBuf newPartitionMetadataRequest(String topic, long requestId) {
-        return Commands.newPartitionMetadataRequest(topic, requestId, null);
+        return Commands.newPartitionMetadataRequest(topic, requestId, null, null, null);
     }
 
     public static ByteBuf newPartitionMetadataResponse(int partitions, long requestId) {
@@ -469,7 +480,7 @@ public static ByteBuf newPartitionMetadataResponse(int partitions, long requestI
     }
 
     public static ByteBuf newLookup(String topic, boolean authoritative, long requestId) {
-        return Commands.newLookup(topic, authoritative, null, requestId);
+        return Commands.newLookup(topic, authoritative, null, null, null, requestId);
     }
 
     public static ByteBuf newLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boolean authoritative,
@@ -875,12 +886,20 @@ private static int getCurrentProtocolVersion() {
         None;
     }
 
-    public static ByteBuf newPartitionMetadataRequest(String topic, long requestId, String clientAuthRole) {
+    public static ByteBuf newPartitionMetadataRequest(String topic, long requestId, String originalAuthRole,
+            String originalAuthData, String originalAuthMethod) {
         CommandPartitionedTopicMetadata.Builder partitionMetadataBuilder = CommandPartitionedTopicMetadata.newBuilder();
         partitionMetadataBuilder.setTopic(topic);
         partitionMetadataBuilder.setRequestId(requestId);
-        if (clientAuthRole != null) { 
-            partitionMetadataBuilder.setOriginalPrincipal(clientAuthRole);
+        if (originalAuthRole != null) {
+            partitionMetadataBuilder.setOriginalPrincipal(originalAuthRole);
+        }
+        if (originalAuthData != null) {
+            partitionMetadataBuilder.setOriginalAuthData(originalAuthData);
+        }
+
+        if (originalAuthMethod != null) {
+            partitionMetadataBuilder.setOriginalAuthMethod(originalAuthMethod);
         }
         CommandPartitionedTopicMetadata partitionMetadata = partitionMetadataBuilder.build();
         ByteBuf res = serializeWithSize(
@@ -890,13 +909,21 @@ public static ByteBuf newPartitionMetadataRequest(String topic, long requestId,
         return res;
     }
 
-    public static ByteBuf newLookup(String topic, boolean authoritative, String clientAuthRole, long requestId) {
+    public static ByteBuf newLookup(String topic, boolean authoritative, String originalAuthRole,
+            String originalAuthData, String originalAuthMethod, long requestId) {
         CommandLookupTopic.Builder lookupTopicBuilder = CommandLookupTopic.newBuilder();
         lookupTopicBuilder.setTopic(topic);
         lookupTopicBuilder.setRequestId(requestId);
         lookupTopicBuilder.setAuthoritative(authoritative);
-        if (clientAuthRole != null) {
-            lookupTopicBuilder.setOriginalPrincipal(clientAuthRole);
+        if (originalAuthRole != null) {
+            lookupTopicBuilder.setOriginalPrincipal(originalAuthRole);
+        }
+        if (originalAuthData != null) {
+            lookupTopicBuilder.setOriginalAuthData(originalAuthData);
+        }
+
+        if (originalAuthMethod != null) {
+            lookupTopicBuilder.setOriginalAuthMethod(originalAuthMethod);
         }
         CommandLookupTopic lookupBroker = lookupTopicBuilder.build();
         ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP).setLookupTopic(lookupBroker));
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 16a3cf016..8b1ee0e72 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -4373,6 +4373,14 @@ public Builder clearPayloadSize() {
     // optional string original_principal = 7;
     boolean hasOriginalPrincipal();
     String getOriginalPrincipal();
+    
+    // optional string original_auth_data = 8;
+    boolean hasOriginalAuthData();
+    String getOriginalAuthData();
+    
+    // optional string original_auth_method = 9;
+    boolean hasOriginalAuthMethod();
+    String getOriginalAuthMethod();
   }
   public static final class CommandConnect extends
       com.google.protobuf.GeneratedMessageLite
@@ -4569,6 +4577,70 @@ public String getOriginalPrincipal() {
       }
     }
     
+    // optional string original_auth_data = 8;
+    public static final int ORIGINAL_AUTH_DATA_FIELD_NUMBER = 8;
+    private java.lang.Object originalAuthData_;
+    public boolean hasOriginalAuthData() {
+      return ((bitField0_ & 0x00000080) == 0x00000080);
+    }
+    public String getOriginalAuthData() {
+      java.lang.Object ref = originalAuthData_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          originalAuthData_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getOriginalAuthDataBytes() {
+      java.lang.Object ref = originalAuthData_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        originalAuthData_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // optional string original_auth_method = 9;
+    public static final int ORIGINAL_AUTH_METHOD_FIELD_NUMBER = 9;
+    private java.lang.Object originalAuthMethod_;
+    public boolean hasOriginalAuthMethod() {
+      return ((bitField0_ & 0x00000100) == 0x00000100);
+    }
+    public String getOriginalAuthMethod() {
+      java.lang.Object ref = originalAuthMethod_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          originalAuthMethod_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getOriginalAuthMethodBytes() {
+      java.lang.Object ref = originalAuthMethod_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        originalAuthMethod_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
     private void initFields() {
       clientVersion_ = "";
       authMethod_ = org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod.AuthMethodNone;
@@ -4577,6 +4649,8 @@ private void initFields() {
       protocolVersion_ = 0;
       proxyToBrokerUrl_ = "";
       originalPrincipal_ = "";
+      originalAuthData_ = "";
+      originalAuthMethod_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4620,6 +4694,12 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000040) == 0x00000040)) {
         output.writeBytes(7, getOriginalPrincipalBytes());
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        output.writeBytes(8, getOriginalAuthDataBytes());
+      }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        output.writeBytes(9, getOriginalAuthMethodBytes());
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -4656,6 +4736,14 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(7, getOriginalPrincipalBytes());
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(8, getOriginalAuthDataBytes());
+      }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(9, getOriginalAuthMethodBytes());
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -4783,6 +4871,10 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000020);
         originalPrincipal_ = "";
         bitField0_ = (bitField0_ & ~0x00000040);
+        originalAuthData_ = "";
+        bitField0_ = (bitField0_ & ~0x00000080);
+        originalAuthMethod_ = "";
+        bitField0_ = (bitField0_ & ~0x00000100);
         return this;
       }
       
@@ -4844,6 +4936,14 @@ public Builder clone() {
           to_bitField0_ |= 0x00000040;
         }
         result.originalPrincipal_ = originalPrincipal_;
+        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+          to_bitField0_ |= 0x00000080;
+        }
+        result.originalAuthData_ = originalAuthData_;
+        if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+          to_bitField0_ |= 0x00000100;
+        }
+        result.originalAuthMethod_ = originalAuthMethod_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -4871,6 +4971,12 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandCon
         if (other.hasOriginalPrincipal()) {
           setOriginalPrincipal(other.getOriginalPrincipal());
         }
+        if (other.hasOriginalAuthData()) {
+          setOriginalAuthData(other.getOriginalAuthData());
+        }
+        if (other.hasOriginalAuthMethod()) {
+          setOriginalAuthMethod(other.getOriginalAuthMethod());
+        }
         return this;
       }
       
@@ -4943,6 +5049,16 @@ public Builder mergeFrom(
               originalPrincipal_ = input.readBytes();
               break;
             }
+            case 66: {
+              bitField0_ |= 0x00000080;
+              originalAuthData_ = input.readBytes();
+              break;
+            }
+            case 74: {
+              bitField0_ |= 0x00000100;
+              originalAuthMethod_ = input.readBytes();
+              break;
+            }
           }
         }
       }
@@ -5162,6 +5278,78 @@ void setOriginalPrincipal(com.google.protobuf.ByteString value) {
         
       }
       
+      // optional string original_auth_data = 8;
+      private java.lang.Object originalAuthData_ = "";
+      public boolean hasOriginalAuthData() {
+        return ((bitField0_ & 0x00000080) == 0x00000080);
+      }
+      public String getOriginalAuthData() {
+        java.lang.Object ref = originalAuthData_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          originalAuthData_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setOriginalAuthData(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000080;
+        originalAuthData_ = value;
+        
+        return this;
+      }
+      public Builder clearOriginalAuthData() {
+        bitField0_ = (bitField0_ & ~0x00000080);
+        originalAuthData_ = getDefaultInstance().getOriginalAuthData();
+        
+        return this;
+      }
+      void setOriginalAuthData(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000080;
+        originalAuthData_ = value;
+        
+      }
+      
+      // optional string original_auth_method = 9;
+      private java.lang.Object originalAuthMethod_ = "";
+      public boolean hasOriginalAuthMethod() {
+        return ((bitField0_ & 0x00000100) == 0x00000100);
+      }
+      public String getOriginalAuthMethod() {
+        java.lang.Object ref = originalAuthMethod_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          originalAuthMethod_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setOriginalAuthMethod(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000100;
+        originalAuthMethod_ = value;
+        
+        return this;
+      }
+      public Builder clearOriginalAuthMethod() {
+        bitField0_ = (bitField0_ & ~0x00000100);
+        originalAuthMethod_ = getDefaultInstance().getOriginalAuthMethod();
+        
+        return this;
+      }
+      void setOriginalAuthMethod(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000100;
+        originalAuthMethod_ = value;
+        
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandConnect)
     }
     
@@ -6846,6 +7034,14 @@ public Builder clearReadCompacted() {
     // optional string original_principal = 3;
     boolean hasOriginalPrincipal();
     String getOriginalPrincipal();
+    
+    // optional string original_auth_data = 4;
+    boolean hasOriginalAuthData();
+    String getOriginalAuthData();
+    
+    // optional string original_auth_method = 5;
+    boolean hasOriginalAuthMethod();
+    String getOriginalAuthMethod();
   }
   public static final class CommandPartitionedTopicMetadata extends
       com.google.protobuf.GeneratedMessageLite
@@ -6958,10 +7154,76 @@ public String getOriginalPrincipal() {
       }
     }
     
+    // optional string original_auth_data = 4;
+    public static final int ORIGINAL_AUTH_DATA_FIELD_NUMBER = 4;
+    private java.lang.Object originalAuthData_;
+    public boolean hasOriginalAuthData() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public String getOriginalAuthData() {
+      java.lang.Object ref = originalAuthData_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          originalAuthData_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getOriginalAuthDataBytes() {
+      java.lang.Object ref = originalAuthData_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        originalAuthData_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // optional string original_auth_method = 5;
+    public static final int ORIGINAL_AUTH_METHOD_FIELD_NUMBER = 5;
+    private java.lang.Object originalAuthMethod_;
+    public boolean hasOriginalAuthMethod() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    public String getOriginalAuthMethod() {
+      java.lang.Object ref = originalAuthMethod_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          originalAuthMethod_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getOriginalAuthMethodBytes() {
+      java.lang.Object ref = originalAuthMethod_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        originalAuthMethod_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
     private void initFields() {
       topic_ = "";
       requestId_ = 0L;
       originalPrincipal_ = "";
+      originalAuthData_ = "";
+      originalAuthMethod_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -6997,6 +7259,12 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBytes(3, getOriginalPrincipalBytes());
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeBytes(4, getOriginalAuthDataBytes());
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeBytes(5, getOriginalAuthMethodBytes());
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -7017,6 +7285,14 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(3, getOriginalPrincipalBytes());
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(4, getOriginalAuthDataBytes());
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(5, getOriginalAuthMethodBytes());
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -7136,6 +7412,10 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000002);
         originalPrincipal_ = "";
         bitField0_ = (bitField0_ & ~0x00000004);
+        originalAuthData_ = "";
+        bitField0_ = (bitField0_ & ~0x00000008);
+        originalAuthMethod_ = "";
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
       
@@ -7181,6 +7461,14 @@ public Builder clone() {
           to_bitField0_ |= 0x00000004;
         }
         result.originalPrincipal_ = originalPrincipal_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.originalAuthData_ = originalAuthData_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.originalAuthMethod_ = originalAuthMethod_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -7196,6 +7484,12 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandPar
         if (other.hasOriginalPrincipal()) {
           setOriginalPrincipal(other.getOriginalPrincipal());
         }
+        if (other.hasOriginalAuthData()) {
+          setOriginalAuthData(other.getOriginalAuthData());
+        }
+        if (other.hasOriginalAuthMethod()) {
+          setOriginalAuthMethod(other.getOriginalAuthMethod());
+        }
         return this;
       }
       
@@ -7248,6 +7542,16 @@ public Builder mergeFrom(
               originalPrincipal_ = input.readBytes();
               break;
             }
+            case 34: {
+              bitField0_ |= 0x00000008;
+              originalAuthData_ = input.readBytes();
+              break;
+            }
+            case 42: {
+              bitField0_ |= 0x00000010;
+              originalAuthMethod_ = input.readBytes();
+              break;
+            }
           }
         }
       }
@@ -7347,6 +7651,78 @@ void setOriginalPrincipal(com.google.protobuf.ByteString value) {
         
       }
       
+      // optional string original_auth_data = 4;
+      private java.lang.Object originalAuthData_ = "";
+      public boolean hasOriginalAuthData() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public String getOriginalAuthData() {
+        java.lang.Object ref = originalAuthData_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          originalAuthData_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setOriginalAuthData(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        originalAuthData_ = value;
+        
+        return this;
+      }
+      public Builder clearOriginalAuthData() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        originalAuthData_ = getDefaultInstance().getOriginalAuthData();
+        
+        return this;
+      }
+      void setOriginalAuthData(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000008;
+        originalAuthData_ = value;
+        
+      }
+      
+      // optional string original_auth_method = 5;
+      private java.lang.Object originalAuthMethod_ = "";
+      public boolean hasOriginalAuthMethod() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      public String getOriginalAuthMethod() {
+        java.lang.Object ref = originalAuthMethod_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          originalAuthMethod_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setOriginalAuthMethod(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000010;
+        originalAuthMethod_ = value;
+        
+        return this;
+      }
+      public Builder clearOriginalAuthMethod() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        originalAuthMethod_ = getDefaultInstance().getOriginalAuthMethod();
+        
+        return this;
+      }
+      void setOriginalAuthMethod(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000010;
+        originalAuthMethod_ = value;
+        
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandPartitionedTopicMetadata)
     }
     
@@ -8026,6 +8402,14 @@ void setMessage(com.google.protobuf.ByteString value) {
     // optional string original_principal = 4;
     boolean hasOriginalPrincipal();
     String getOriginalPrincipal();
+    
+    // optional string original_auth_data = 5;
+    boolean hasOriginalAuthData();
+    String getOriginalAuthData();
+    
+    // optional string original_auth_method = 6;
+    boolean hasOriginalAuthMethod();
+    String getOriginalAuthMethod();
   }
   public static final class CommandLookupTopic extends
       com.google.protobuf.GeneratedMessageLite
@@ -8148,11 +8532,77 @@ public String getOriginalPrincipal() {
       }
     }
     
+    // optional string original_auth_data = 5;
+    public static final int ORIGINAL_AUTH_DATA_FIELD_NUMBER = 5;
+    private java.lang.Object originalAuthData_;
+    public boolean hasOriginalAuthData() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    public String getOriginalAuthData() {
+      java.lang.Object ref = originalAuthData_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          originalAuthData_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getOriginalAuthDataBytes() {
+      java.lang.Object ref = originalAuthData_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        originalAuthData_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // optional string original_auth_method = 6;
+    public static final int ORIGINAL_AUTH_METHOD_FIELD_NUMBER = 6;
+    private java.lang.Object originalAuthMethod_;
+    public boolean hasOriginalAuthMethod() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    public String getOriginalAuthMethod() {
+      java.lang.Object ref = originalAuthMethod_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          originalAuthMethod_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getOriginalAuthMethodBytes() {
+      java.lang.Object ref = originalAuthMethod_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        originalAuthMethod_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
     private void initFields() {
       topic_ = "";
       requestId_ = 0L;
       authoritative_ = false;
       originalPrincipal_ = "";
+      originalAuthData_ = "";
+      originalAuthMethod_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -8191,6 +8641,12 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
         output.writeBytes(4, getOriginalPrincipalBytes());
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeBytes(5, getOriginalAuthDataBytes());
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeBytes(6, getOriginalAuthMethodBytes());
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -8215,6 +8671,14 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(4, getOriginalPrincipalBytes());
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(5, getOriginalAuthDataBytes());
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(6, getOriginalAuthMethodBytes());
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -8336,6 +8800,10 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000004);
         originalPrincipal_ = "";
         bitField0_ = (bitField0_ & ~0x00000008);
+        originalAuthData_ = "";
+        bitField0_ = (bitField0_ & ~0x00000010);
+        originalAuthMethod_ = "";
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
       
@@ -8385,6 +8853,14 @@ public Builder clone() {
           to_bitField0_ |= 0x00000008;
         }
         result.originalPrincipal_ = originalPrincipal_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.originalAuthData_ = originalAuthData_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.originalAuthMethod_ = originalAuthMethod_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -8403,6 +8879,12 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandLoo
         if (other.hasOriginalPrincipal()) {
           setOriginalPrincipal(other.getOriginalPrincipal());
         }
+        if (other.hasOriginalAuthData()) {
+          setOriginalAuthData(other.getOriginalAuthData());
+        }
+        if (other.hasOriginalAuthMethod()) {
+          setOriginalAuthMethod(other.getOriginalAuthMethod());
+        }
         return this;
       }
       
@@ -8460,6 +8942,16 @@ public Builder mergeFrom(
               originalPrincipal_ = input.readBytes();
               break;
             }
+            case 42: {
+              bitField0_ |= 0x00000010;
+              originalAuthData_ = input.readBytes();
+              break;
+            }
+            case 50: {
+              bitField0_ |= 0x00000020;
+              originalAuthMethod_ = input.readBytes();
+              break;
+            }
           }
         }
       }
@@ -8580,6 +9072,78 @@ void setOriginalPrincipal(com.google.protobuf.ByteString value) {
         
       }
       
+      // optional string original_auth_data = 5;
+      private java.lang.Object originalAuthData_ = "";
+      public boolean hasOriginalAuthData() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      public String getOriginalAuthData() {
+        java.lang.Object ref = originalAuthData_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          originalAuthData_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setOriginalAuthData(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000010;
+        originalAuthData_ = value;
+        
+        return this;
+      }
+      public Builder clearOriginalAuthData() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        originalAuthData_ = getDefaultInstance().getOriginalAuthData();
+        
+        return this;
+      }
+      void setOriginalAuthData(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000010;
+        originalAuthData_ = value;
+        
+      }
+      
+      // optional string original_auth_method = 6;
+      private java.lang.Object originalAuthMethod_ = "";
+      public boolean hasOriginalAuthMethod() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      public String getOriginalAuthMethod() {
+        java.lang.Object ref = originalAuthMethod_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          originalAuthMethod_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setOriginalAuthMethod(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000020;
+        originalAuthMethod_ = value;
+        
+        return this;
+      }
+      public Builder clearOriginalAuthMethod() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        originalAuthMethod_ = getDefaultInstance().getOriginalAuthMethod();
+        
+        return this;
+      }
+      void setOriginalAuthMethod(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000020;
+        originalAuthMethod_ = value;
+        
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandLookupTopic)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 3b8727354..db2eec4f5 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -149,8 +149,15 @@ message CommandConnect {
 
 	// Original principal that was verified by
 	// a Pulsar proxy. In this case the auth info above
-	// will the the auth of the proxy itself
+	// will be the auth of the proxy itself
 	optional string original_principal = 7;
+	
+	// Original auth role and auth Method that was passed 
+	// to the proxy. In this case the auth info above
+	// will be the auth of the proxy itself	
+	optional string original_auth_data = 8;
+	optional string original_auth_method = 9;
+	
 }
 
 message CommandConnected {
@@ -191,7 +198,14 @@ message CommandSubscribe {
 message CommandPartitionedTopicMetadata {
 	required string topic            = 1;
 	required uint64 request_id       = 2;
+	// Original principal that was verified by
+	// a Pulsar proxy.
 	optional string original_principal = 3;
+	
+	// Original auth role and auth Method that was passed 
+	// to the proxy.	
+	optional string original_auth_data = 4;
+	optional string original_auth_method = 5;
 }
 
 message CommandPartitionedTopicMetadataResponse {
@@ -210,7 +224,15 @@ message CommandLookupTopic {
 	required string topic            = 1;
 	required uint64 request_id       = 2;
 	optional bool authoritative      = 3 [default = false];
+	
+	// Original principal that was verified by
+	// a Pulsar proxy.
 	optional string original_principal = 4;
+	
+	// Original auth role and auth Method that was passed 
+	// to the proxy. 	
+	optional string original_auth_data = 5;
+	optional string original_auth_method = 6;
 }
 
 message CommandLookupTopicResponse {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index d85342530..9fc2ff57d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -33,6 +33,7 @@
 import org.slf4j.LoggerFactory;
 
 import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -52,6 +53,9 @@
     private Channel inboundChannel;
     Channel outboundChannel;
     private String originalPrincipal;
+    private String clientAuthData;
+    private String clientAuthMethod;
+    private boolean forwardAuthData;
     public static final String TLS_HANDLER = "tls";
 
     private final Authentication authentication;
@@ -60,8 +64,11 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection,
         this.authentication = service.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
         this.originalPrincipal = proxyConnection.clientAuthRole;
+        this.clientAuthData = proxyConnection.clientAuthData;
+        this.clientAuthMethod = proxyConnection.clientAuthMethod;
         ProxyConfiguration config = service.getConfiguration();
-
+        this.forwardAuthData = service.getConfiguration().forwardAuthorizationCredentials();
+        
         // Start the connection attempt.
         Bootstrap b = new Bootstrap();
         // Tie the backend connection on the same thread to avoid context switches when passing data between the 2
@@ -136,8 +143,10 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
             if (authentication.getAuthData().hasDataFromCommand()) {
                 authData = authentication.getAuthData().getCommandData();
             }
-            outboundChannel.writeAndFlush(Commands.newConnect(authentication.getAuthMethodName(), authData,
-                    "Pulsar proxy", null /* target broker */, originalPrincipal));
+            ByteBuf command = null;
+            command = Commands.newConnect(authentication.getAuthMethodName(), authData, "Pulsar proxy",
+                    null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod);
+            outboundChannel.writeAndFlush(command);
             outboundChannel.read();
         }
 
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 5d35b862f..f9ba3c174 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -116,7 +116,8 @@ private void performLookup(long clientRequestId, String topic, String brokerServ
             long requestId = service.newRequestId();
             ByteBuf command;
             if (service.getConfiguration().isAuthenticationEnabled()) {
-                command = Commands.newLookup(topic, authoritative, proxyConnection.clientAuthRole, requestId);
+                command = Commands.newLookup(topic, authoritative, proxyConnection.clientAuthRole,
+                        proxyConnection.clientAuthData, proxyConnection.clientAuthMethod, requestId);
             } else {
                 command = Commands.newLookup(topic, authoritative, requestId);
             }
@@ -196,7 +197,9 @@ public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata part
                 long requestId = service.newRequestId();
                 ByteBuf command;
                 if (service.getConfiguration().isAuthenticationEnabled()) {
-                    command = Commands.newPartitionMetadataRequest(dn.toString(), requestId, proxyConnection.clientAuthRole);
+                    command = Commands.newPartitionMetadataRequest(dn.toString(), requestId,
+                            proxyConnection.clientAuthRole, proxyConnection.clientAuthData,
+                            proxyConnection.clientAuthMethod);
                 } else {
                     command = Commands.newPartitionMetadataRequest(dn.toString(), requestId);
                 }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 1c45c4fed..0cd2c2936 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -59,7 +59,10 @@
     private Set<String> authenticationProviders = Sets.newTreeSet();
     // Enforce authorization
     private boolean authorizationEnabled = false;
-
+    // Forward client authData to Broker for re authorization
+    // make sure authentication is enabled for this to take effect
+    private boolean forwardAuthorizationCredentials = false;
+            
     // Authentication settings of the proxy itself. Used to connect to brokers
     private String brokerClientAuthenticationPlugin;
     private String brokerClientAuthenticationParameters;
@@ -82,6 +85,14 @@
 
     private Properties properties = new Properties();
 
+    public boolean forwardAuthorizationCredentials() {
+        return forwardAuthorizationCredentials;
+    }
+    
+    public void setForwardAuthorizationCredentials(boolean forwardAuthorizationCredentials) {
+        this.forwardAuthorizationCredentials = forwardAuthorizationCredentials;
+    }
+    
     public String getBrokerServiceURLTLS() {
         return brokerServiceURLTLS;
     }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 901ea4e4a..921376fc2 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -53,6 +53,8 @@
 
     private ProxyService service;
     String clientAuthRole = null;
+    String clientAuthData = null;
+    String clientAuthMethod = null;
     private State state;
 
     private LookupProxyHandler lookupProxyHandler = null;
@@ -210,6 +212,11 @@ private boolean verifyAuthenticationIfNeeded(CommandConnect connect) {
                 authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
             }
             String authData = connect.getAuthData().toStringUtf8();
+
+            if (service.getConfiguration().forwardAuthorizationCredentials()) {
+                clientAuthData = authData;
+                clientAuthMethod = authMethod;
+            }
             ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
             SSLSession sslSession = null;
             if (sslHandler != null) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
new file mode 100644
index 000000000..2309ebbf3
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.proxy.server.ProxyRolesEnforcementTest.BasicAuthentication;
+import org.apache.pulsar.proxy.server.ProxyRolesEnforcementTest.BasicAuthenticationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ProxyForwardAuthDataTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(ProxyForwardAuthDataTest.class);
+    private int webServicePort;
+    private int servicePort;
+    
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        webServicePort = PortManager.nextFreePort();
+        servicePort = PortManager.nextFreePort();
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+        conf.setTlsEnabled(false);
+        conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+        conf.setBrokerClientAuthenticationParameters("authParam:broker");
+        conf.setAuthenticateOriginalAuthData(true);
+        
+        Set<String> superUserRoles = new HashSet<String>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        
+        Set<String> providers = new HashSet<String>();
+        providers.add(BasicAuthenticationProvider.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName("use");
+        Set<String> proxyRoles = new HashSet<String>();
+        proxyRoles.add("proxy");
+        conf.setProxyRoles(proxyRoles);
+
+        super.init();
+    }
+
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();       
+    }
+    
+    @Test
+    void testForwardAuthData() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        // Step 1: Create Admin Client
+        createAdminClient();
+        final String proxyServiceUrl = "pulsar://localhost:" + servicePort;
+        // create a client which connects to proxy and pass authData
+        String namespaceName = "my-property/use/my-ns";
+        String topicName = "persistent://my-property/use/my-ns/my-topic1";
+        String subscriptionName = "my-subscriber-name";
+        String clientAuthParams = "authParam:client";
+        String proxyAuthParams = "authParam:proxy";
+        
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+        admin.namespaces().createNamespace(namespaceName);
+        
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy", Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "client", Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+        
+        // Step 2: Run Pulsar Proxy without forwarding authData - expect Exception
+        ProxyConfiguration proxyConfig = new ProxyConfiguration();
+        proxyConfig.setAuthenticationEnabled(true);
+
+        proxyConfig.setServicePort(servicePort);
+        proxyConfig.setWebServicePort(webServicePort);
+        proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+        
+        proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(BasicAuthenticationProvider.class.getName());
+        proxyConfig.setAuthenticationProviders(providers);
+        ProxyService proxyService = new ProxyService(proxyConfig);
+
+        proxyService.start();
+        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams);
+        Consumer consumer;
+        boolean exceptionOccured = false;
+        try {
+            consumer = proxyClient.subscribe(topicName, subscriptionName);
+        } catch(Exception ex) {
+            exceptionOccured  = true;
+        }         
+        Assert.assertTrue(exceptionOccured);
+        proxyService.close();
+        
+        // Step 3: Create proxy with forwardAuthData enabled
+        proxyConfig.setForwardAuthorizationCredentials(true);
+        proxyService = new ProxyService(proxyConfig);
+
+        proxyService.start();
+        consumer = proxyClient.subscribe(topicName, subscriptionName);   
+        Assert.assertTrue(exceptionOccured);
+        proxyService.close();
+    }
+
+    private void createAdminClient() throws PulsarClientException {
+        String adminAuthParams = "authParam:admin";
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setAuthentication(BasicAuthentication.class.getName(), adminAuthParams);
+
+        admin = spy(new PulsarAdmin(brokerUrl, clientConf));        
+    }
+    
+    private PulsarClient createPulsarClient(String proxyServiceUrl, String authParams) throws PulsarClientException {
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setAuthentication(BasicAuthentication.class.getName(), authParams);
+        return PulsarClient.create(proxyServiceUrl, clientConf);
+    }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 73bf11f43..1c720aa94 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -43,7 +43,6 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -153,7 +152,6 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
     protected void setup() throws Exception {
         webServicePort = PortManager.nextFreePort();
         servicePort = PortManager.nextFreePort();
-        // enable tls and auth&auth at broker 
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
         conf.setTlsEnabled(false);
@@ -205,7 +203,6 @@ void testIncorrectRoles() throws Exception {
         // Step 2: Try to use proxy Client as a normal Client - expect exception
         PulsarClient proxyClient = createPulsarClient("pulsar://localhost:" + BROKER_PORT, proxyAuthParams);
         ConsumerConfiguration consumerConf = new ConsumerConfiguration();
-        consumerConf.setSubscriptionType(SubscriptionType.Exclusive);
         Consumer consumer;
         boolean exceptionOccured = false;
         try {
@@ -216,7 +213,7 @@ void testIncorrectRoles() throws Exception {
         }         
         Assert.assertTrue(exceptionOccured);
         
-        // Step 4: Run Pulsar Proxy and pass proxy params as client params - expect exception
+        // Step 3: Run Pulsar Proxy and pass proxy params as client params - expect exception
         ProxyConfiguration proxyConfig = new ProxyConfiguration();
         proxyConfig.setAuthenticationEnabled(true);
 
@@ -234,6 +231,7 @@ void testIncorrectRoles() throws Exception {
 
         proxyService.start();
         proxyClient = createPulsarClient(proxyServiceUrl, proxyAuthParams);
+        exceptionOccured = false;
         try {
             consumer = proxyClient.subscribe(topicName, subscriptionName,
                 consumerConf);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services