You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 23:39:19 UTC
[pulsar] branch master updated: [fix][broker] Fix passing incorrect authentication data (#16201)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 936bbbcc6a4 [fix][broker] Fix passing incorrect authentication data (#16201)
936bbbcc6a4 is described below
commit 936bbbcc6a4e8cf61547aeedf92e84fb3a089502
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Wed Jun 29 07:39:10 2022 +0800
[fix][broker] Fix passing incorrect authentication data (#16201)
### Motivation
#16065 fixes the race condition issue, but introduces a new issue. This issue is triggered when the Proxy and Broker work together, when we use the proxy to request the broker to do lookup/subscribe/produce operation, the broker always uses the original authentication data for authorization, not proxy authentication data, which causes this issue.
### Modification
- Fix passing authentication data, differentiate between original auth data and connected auth data by avoid to use the `getAuthenticationData()`, this method name is easy to cause confusion and can not correctly get the authentication data
---
.../apache/pulsar/broker/service/ServerCnx.java | 65 +++-
.../broker/service/ServerCnxAuthorizationTest.java | 433 +++++++++++++++++++++
.../pulsar/broker/service/ServerCnxTest.java | 2 +-
3 files changed, 483 insertions(+), 17 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 39b10af6b47..07cf63d679f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -166,14 +166,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final int maxSubscriptionPatternLength;
private State state;
private volatile boolean isActive = true;
- String authRole = null;
+ private String authRole = null;
private volatile AuthenticationDataSource authenticationData;
- AuthenticationProvider authenticationProvider;
- AuthenticationState authState;
+ private AuthenticationProvider authenticationProvider;
+ private AuthenticationState authState;
// In case of proxy, if the authentication credentials are forwardable,
// it will hold the credentials of the original client
- AuthenticationState originalAuthState;
- AuthenticationDataSource originalAuthData;
+ private AuthenticationState originalAuthState;
+ private AuthenticationDataSource originalAuthData;
private boolean pendingAuthChallengeResponse = false;
// Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
@@ -382,19 +382,20 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
// ////
private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, TopicOperation operation,
- AuthenticationDataSource authData) {
+ AuthenticationDataSource authDataSource, AuthenticationDataSource originalAuthDataSource) {
if (!service.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
- topicName, operation, originalPrincipal, authData);
+ topicName, operation, originalPrincipal,
+ originalAuthDataSource != null ? originalAuthDataSource : authDataSource);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
CompletableFuture<Boolean> isAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
- topicName, operation, authRole, authData);
+ topicName, operation, authRole, authDataSource);
return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
if (!isProxyAuthorized) {
log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}",
@@ -411,9 +412,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, String subscriptionName,
TopicOperation operation) {
if (service.isAuthorizationEnabled()) {
- AuthenticationDataSource authData =
- new AuthenticationDataSubscription(getAuthenticationData(), subscriptionName);
- return isTopicOperationAllowed(topicName, operation, authData);
+ AuthenticationDataSource authDataSource =
+ new AuthenticationDataSubscription(authenticationData, subscriptionName);
+ AuthenticationDataSource originalAuthDataSource = null;
+ if (originalAuthData != null) {
+ originalAuthDataSource = new AuthenticationDataSubscription(originalAuthData, subscriptionName);
+ }
+ return isTopicOperationAllowed(topicName, operation, authDataSource, originalAuthDataSource);
} else {
return CompletableFuture.completedFuture(true);
}
@@ -448,7 +453,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
lookupSemaphore.release();
return;
}
- isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, getAuthenticationData()).thenApply(
+ isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative,
@@ -512,7 +517,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
lookupSemaphore.release();
return;
}
- isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, getAuthenticationData()).thenApply(
+ isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
@@ -875,6 +880,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
} else {
originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null;
+
if (log.isDebugEnabled()) {
log.debug("[{}] Authenticate original role (forwarded from proxy): {}",
remoteAddress, originalPrincipal);
@@ -1200,13 +1206,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
- topicName, TopicOperation.PRODUCE, getAuthenticationData()
+ topicName, TopicOperation.PRODUCE, authenticationData, originalAuthData
);
if (!Strings.isNullOrEmpty(initialSubscriptionName)) {
isAuthorizedFuture =
isAuthorizedFuture.thenCombine(
- isTopicOperationAllowed(topicName, TopicOperation.SUBSCRIBE, getAuthenticationData()),
+ isTopicOperationAllowed(topicName, TopicOperation.SUBSCRIBE, authenticationData,
+ originalAuthData),
(canProduce, canSubscribe) -> canProduce && canSubscribe);
}
@@ -1949,7 +1956,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
- namespaceName, operation, originalPrincipal, getAuthenticationData());
+ namespaceName, operation, originalPrincipal, originalAuthData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
@@ -2956,6 +2963,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
public String clientSourceAddress() {
+ AuthenticationDataSource authenticationDataSource = this.getAuthData();
if (proxyMessage != null) {
return proxyMessage.sourceAddress();
} else if (remoteAddress instanceof InetSocketAddress) {
@@ -2993,4 +3001,29 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
public boolean hasProducers() {
return !producers.isEmpty();
}
+
+ @VisibleForTesting
+ protected String getOriginalPrincipal() {
+ return originalPrincipal;
+ }
+
+ @VisibleForTesting
+ protected AuthenticationDataSource getAuthData() {
+ return authenticationData;
+ }
+
+ @VisibleForTesting
+ protected AuthenticationDataSource getOriginalAuthData() {
+ return originalAuthData;
+ }
+
+ @VisibleForTesting
+ protected AuthenticationState getOriginalAuthState() {
+ return originalAuthState;
+ }
+
+ @VisibleForTesting
+ protected void setAuthRole(String authRole) {
+ this.authRole = authRole;
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
new file mode 100644
index 00000000000..af539891711
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service;
+
+import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import javax.crypto.SecretKey;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.api.proto.CommandConnect;
+import org.apache.pulsar.common.api.proto.CommandLookupTopic;
+import org.apache.pulsar.common.api.proto.CommandProducer;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.zookeeper.ZooKeeper;
+import org.mockito.ArgumentMatcher;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ServerCnxAuthorizationTest {
+ private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+ private final String CLIENT_PRINCIPAL = "client";
+ private final String PROXY_PRINCIPAL = "proxy";
+ private final String CLIENT_TOKEN = Jwts.builder().setSubject(CLIENT_PRINCIPAL).signWith(SECRET_KEY).compact();
+ private final String PROXY_TOKEN = Jwts.builder().setSubject(PROXY_PRINCIPAL).signWith(SECRET_KEY).compact();
+
+ private PulsarService pulsar;
+ private PulsarResources pulsarResources;
+ private BrokerService brokerService;
+ private ServiceConfiguration svcConfig;
+
+ @BeforeMethod(alwaysRun = true)
+ public void beforeMethod() throws Exception {
+ EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+ svcConfig = spy(ServiceConfiguration.class);
+ svcConfig.setKeepAliveIntervalSeconds(0);
+ svcConfig.setBrokerShutdownTimeoutMs(0L);
+ svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+ svcConfig.setClusterName("pulsar-cluster");
+ svcConfig.setSuperUserRoles(Collections.singleton(PROXY_PRINCIPAL));
+ svcConfig.setAuthenticationEnabled(true);
+ svcConfig.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName()));
+ svcConfig.setAuthorizationEnabled(true);
+ svcConfig.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+ Properties properties = new Properties();
+ properties.setProperty("tokenSecretKey", "data:;base64,"
+ + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+ svcConfig.setProperties(properties);
+
+ pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig);
+ doReturn(new DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
+
+ doReturn(svcConfig).when(pulsar).getConfiguration();
+ doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
+
+ ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class);
+ doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+
+ ZooKeeper mockZk = createMockZooKeeper();
+ OrderedExecutor executor = OrderedExecutor.newBuilder().numThreads(1).build();
+ doReturn(createMockBookKeeper(executor))
+ .when(pulsar).getBookKeeperClient();
+
+ MetadataStore store = new ZKMetadataStore(mockZk);
+
+ doReturn(store).when(pulsar).getLocalMetadataStore();
+ doReturn(store).when(pulsar).getConfigurationMetadataStore();
+
+ pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
+ doReturn(pulsarResources).when(pulsar).getPulsarResources();
+ NamespaceResources namespaceResources =
+ spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
+ doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();
+
+ TenantResources tenantResources = spyWithClassAndConstructorArgs(TenantResources.class, store, 30);
+ doReturn(tenantResources).when(pulsarResources).getTenantResources();
+
+ doReturn(CompletableFuture.completedFuture(Optional.of(TenantInfo.builder().build()))).when(tenantResources)
+ .getTenantAsync("public");
+
+ brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup);
+ BrokerInterceptor interceptor = mock(BrokerInterceptor.class);
+ doReturn(interceptor).when(brokerService).getInterceptor();
+ doReturn(brokerService).when(pulsar).getBrokerService();
+ doReturn(executor).when(pulsar).getOrderedExecutor();
+ }
+
+ @Test
+ public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() throws Exception {
+ doReturn(true).when(svcConfig).isAuthenticateOriginalAuthData();
+
+ ServerCnx serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
+ ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class);
+ Channel channel = mock(Channel.class);
+ ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
+ doReturn(channelPipeline).when(channel).pipeline();
+ doReturn(null).when(channelPipeline).get(PulsarChannelInitializer.TLS_HANDLER);
+
+ SocketAddress socketAddress = new InetSocketAddress(0);
+ doReturn(socketAddress).when(channel).remoteAddress();
+ doReturn(channel).when(channelHandlerContext).channel();
+ channelHandlerContext.channel().remoteAddress();
+ serverCnx.channelActive(channelHandlerContext);
+
+ // connect
+ AuthenticationToken clientAuthenticationToken = new AuthenticationToken(CLIENT_TOKEN);
+ AuthenticationToken proxyAuthenticationToken = new AuthenticationToken(PROXY_TOKEN);
+ CommandConnect connect = new CommandConnect();
+ connect.setAuthMethodName(proxyAuthenticationToken.getAuthMethodName());
+ connect.setAuthData(proxyAuthenticationToken.getAuthData().getCommandData().getBytes(StandardCharsets.UTF_8));
+ connect.setClientVersion("test");
+ connect.setProtocolVersion(1);
+ connect.setOriginalPrincipal(CLIENT_PRINCIPAL);
+ connect.setOriginalAuthData(clientAuthenticationToken.getAuthData().getCommandData());
+ connect.setOriginalAuthMethod(clientAuthenticationToken.getAuthMethodName());
+
+ serverCnx.handleConnect(connect);
+ assertEquals(serverCnx.getOriginalAuthData().getCommandData(),
+ clientAuthenticationToken.getAuthData().getCommandData());
+ assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), CLIENT_PRINCIPAL);
+ assertEquals(serverCnx.getOriginalPrincipal(), CLIENT_PRINCIPAL);
+ assertEquals(serverCnx.getAuthData().getCommandData(),
+ proxyAuthenticationToken.getAuthData().getCommandData());
+ assertEquals(serverCnx.getAuthRole(), PROXY_PRINCIPAL);
+ assertEquals(serverCnx.getAuthState().getAuthRole(), PROXY_PRINCIPAL);
+
+ AuthorizationService authorizationService =
+ spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsarResources);
+ doReturn(authorizationService).when(brokerService).getAuthorizationService();
+
+ // lookup
+ CommandLookupTopic commandLookupTopic = new CommandLookupTopic();
+ TopicName topicName = TopicName.get("persistent://public/default/test-topic");
+ commandLookupTopic.setTopic(topicName.toString());
+ commandLookupTopic.setRequestId(1);
+ serverCnx.handleLookup(commandLookupTopic);
+ verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
+ CLIENT_PRINCIPAL,
+ serverCnx.getOriginalAuthData());
+ verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
+ PROXY_PRINCIPAL,
+ serverCnx.getAuthData());
+
+ // producer
+ CommandProducer commandProducer = new CommandProducer();
+ commandProducer.setRequestId(1);
+ commandProducer.setProducerId(1);
+ commandProducer.setProducerName("test-producer");
+ commandProducer.setTopic(topicName.toString());
+ serverCnx.handleProducer(commandProducer);
+ verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.PRODUCE,
+ CLIENT_PRINCIPAL,
+ serverCnx.getOriginalAuthData());
+ verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
+ PROXY_PRINCIPAL,
+ serverCnx.getAuthData());
+
+ // consumer
+ CommandSubscribe commandSubscribe = new CommandSubscribe();
+ commandSubscribe.setTopic(topicName.toString());
+ commandSubscribe.setRequestId(1);
+ commandSubscribe.setConsumerId(1);
+ final String subscriptionName = "test-subscribe";
+ commandSubscribe.setSubscription("test-subscribe");
+ commandSubscribe.setSubType(CommandSubscribe.SubType.Shared);
+ serverCnx.handleSubscribe(commandSubscribe);
+
+ verify(authorizationService, times(1)).allowTopicOperationAsync(
+ eq(topicName), eq(TopicOperation.CONSUME),
+ eq(CLIENT_PRINCIPAL), argThat(arg -> {
+ assertTrue(arg instanceof AuthenticationDataSubscription);
+ try {
+ assertEquals(arg.getCommandData(), clientAuthenticationToken.getAuthData().getCommandData());
+ } catch (PulsarClientException e) {
+ fail(e.getMessage());
+ }
+ assertEquals(arg.getSubscription(), subscriptionName);
+ return true;
+ }));
+ verify(authorizationService, times(1)).allowTopicOperationAsync(
+ eq(topicName), eq(TopicOperation.CONSUME),
+ eq(PROXY_PRINCIPAL), argThat(arg -> {
+ assertTrue(arg instanceof AuthenticationDataSubscription);
+ try {
+ assertEquals(arg.getCommandData(), proxyAuthenticationToken.getAuthData().getCommandData());
+ } catch (PulsarClientException e) {
+ fail(e.getMessage());
+ }
+ assertEquals(arg.getSubscription(), subscriptionName);
+ return true;
+ }));
+ }
+
+ @Test
+ public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy() throws Exception {
+ doReturn(false).when(svcConfig).isAuthenticateOriginalAuthData();
+
+ ServerCnx serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
+ ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class);
+ Channel channel = mock(Channel.class);
+ ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
+ doReturn(channelPipeline).when(channel).pipeline();
+ doReturn(null).when(channelPipeline).get(PulsarChannelInitializer.TLS_HANDLER);
+
+ SocketAddress socketAddress = new InetSocketAddress(0);
+ doReturn(socketAddress).when(channel).remoteAddress();
+ doReturn(channel).when(channelHandlerContext).channel();
+ channelHandlerContext.channel().remoteAddress();
+ serverCnx.channelActive(channelHandlerContext);
+
+ // connect
+ AuthenticationToken proxyAuthenticationToken = new AuthenticationToken(PROXY_TOKEN);
+ CommandConnect connect = new CommandConnect();
+ connect.setAuthMethodName(proxyAuthenticationToken.getAuthMethodName());
+ connect.setAuthData(proxyAuthenticationToken.getAuthData().getCommandData().getBytes(StandardCharsets.UTF_8));
+ connect.setClientVersion("test");
+ connect.setProtocolVersion(1);
+ connect.setOriginalPrincipal(CLIENT_PRINCIPAL);
+ serverCnx.handleConnect(connect);
+ assertNull(serverCnx.getOriginalAuthData());
+ assertNull(serverCnx.getOriginalAuthState());
+ assertEquals(serverCnx.getOriginalPrincipal(), CLIENT_PRINCIPAL);
+ assertEquals(serverCnx.getAuthData().getCommandData(),
+ proxyAuthenticationToken.getAuthData().getCommandData());
+ assertEquals(serverCnx.getAuthRole(), PROXY_PRINCIPAL);
+ assertEquals(serverCnx.getAuthState().getAuthRole(), PROXY_PRINCIPAL);
+
+ AuthorizationService authorizationService =
+ spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsarResources);
+ doReturn(authorizationService).when(brokerService).getAuthorizationService();
+
+ // lookup
+ CommandLookupTopic commandLookupTopic = new CommandLookupTopic();
+ TopicName topicName = TopicName.get("persistent://public/default/test-topic");
+ commandLookupTopic.setTopic(topicName.toString());
+ commandLookupTopic.setRequestId(1);
+ serverCnx.handleLookup(commandLookupTopic);
+ verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
+ CLIENT_PRINCIPAL,
+ serverCnx.getAuthData());
+ verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
+ PROXY_PRINCIPAL,
+ serverCnx.getAuthData());
+
+ // producer
+ CommandProducer commandProducer = new CommandProducer();
+ commandProducer.setRequestId(1);
+ commandProducer.setProducerId(1);
+ commandProducer.setProducerName("test-producer");
+ commandProducer.setTopic(topicName.toString());
+ serverCnx.handleProducer(commandProducer);
+ verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.PRODUCE,
+ CLIENT_PRINCIPAL,
+ serverCnx.getAuthData());
+ verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
+ PROXY_PRINCIPAL,
+ serverCnx.getAuthData());
+
+ // consumer
+ CommandSubscribe commandSubscribe = new CommandSubscribe();
+ commandSubscribe.setTopic(topicName.toString());
+ commandSubscribe.setRequestId(1);
+ commandSubscribe.setConsumerId(1);
+ final String subscriptionName = "test-subscribe";
+ commandSubscribe.setSubscription("test-subscribe");
+ commandSubscribe.setSubType(CommandSubscribe.SubType.Shared);
+ serverCnx.handleSubscribe(commandSubscribe);
+
+ ArgumentMatcher<AuthenticationDataSource> authenticationDataSourceArgumentMatcher = arg -> {
+ assertTrue(arg instanceof AuthenticationDataSubscription);
+ try {
+ assertEquals(arg.getCommandData(), proxyAuthenticationToken.getAuthData().getCommandData());
+ } catch (PulsarClientException e) {
+ fail(e.getMessage());
+ }
+ assertEquals(arg.getSubscription(), subscriptionName);
+ return true;
+ };
+
+ verify(authorizationService, times(1)).allowTopicOperationAsync(
+ eq(topicName), eq(TopicOperation.CONSUME),
+ eq(CLIENT_PRINCIPAL), argThat(authenticationDataSourceArgumentMatcher));
+ verify(authorizationService, times(1)).allowTopicOperationAsync(
+ eq(topicName), eq(TopicOperation.CONSUME),
+ eq(PROXY_PRINCIPAL), argThat(authenticationDataSourceArgumentMatcher));
+ }
+
+ @Test
+ public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() throws Exception {
+ ServerCnx serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
+
+ ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class);
+ Channel channel = mock(Channel.class);
+ ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
+ doReturn(channelPipeline).when(channel).pipeline();
+ doReturn(null).when(channelPipeline).get(PulsarChannelInitializer.TLS_HANDLER);
+
+ SocketAddress socketAddress = new InetSocketAddress(0);
+ doReturn(socketAddress).when(channel).remoteAddress();
+ doReturn(channel).when(channelHandlerContext).channel();
+ channelHandlerContext.channel().remoteAddress();
+ serverCnx.channelActive(channelHandlerContext);
+
+ // connect
+ AuthenticationToken clientAuthenticationToken = new AuthenticationToken(CLIENT_TOKEN);
+ CommandConnect connect = new CommandConnect();
+ connect.setAuthMethodName(clientAuthenticationToken.getAuthMethodName());
+ connect.setAuthData(clientAuthenticationToken.getAuthData().getCommandData().getBytes(StandardCharsets.UTF_8));
+ connect.setClientVersion("test");
+ connect.setProtocolVersion(1);
+ serverCnx.handleConnect(connect);
+ assertNull(serverCnx.getOriginalAuthData());
+ assertNull(serverCnx.getOriginalAuthState());
+ assertNull(serverCnx.getOriginalPrincipal());
+ assertEquals(serverCnx.getAuthData().getCommandData(),
+ clientAuthenticationToken.getAuthData().getCommandData());
+ assertEquals(serverCnx.getAuthRole(), CLIENT_PRINCIPAL);
+ assertEquals(serverCnx.getAuthState().getAuthRole(), CLIENT_PRINCIPAL);
+
+ AuthorizationService authorizationService =
+ spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsarResources);
+ doReturn(authorizationService).when(brokerService).getAuthorizationService();
+
+ // lookup
+ CommandLookupTopic commandLookupTopic = new CommandLookupTopic();
+ TopicName topicName = TopicName.get("persistent://public/default/test-topic");
+ commandLookupTopic.setTopic(topicName.toString());
+ commandLookupTopic.setRequestId(1);
+ serverCnx.handleLookup(commandLookupTopic);
+ verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
+ CLIENT_PRINCIPAL,
+ serverCnx.getAuthData());
+
+ // producer
+ CommandProducer commandProducer = new CommandProducer();
+ commandProducer.setRequestId(1);
+ commandProducer.setProducerId(1);
+ commandProducer.setProducerName("test-producer");
+ commandProducer.setTopic(topicName.toString());
+ serverCnx.handleProducer(commandProducer);
+ verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.PRODUCE,
+ CLIENT_PRINCIPAL,
+ serverCnx.getAuthData());
+
+ // consumer
+ CommandSubscribe commandSubscribe = new CommandSubscribe();
+ commandSubscribe.setTopic(topicName.toString());
+ commandSubscribe.setRequestId(1);
+ commandSubscribe.setConsumerId(1);
+ final String subscriptionName = "test-subscribe";
+ commandSubscribe.setSubscription("test-subscribe");
+ commandSubscribe.setSubType(CommandSubscribe.SubType.Shared);
+ serverCnx.handleSubscribe(commandSubscribe);
+
+ verify(authorizationService, times(1)).allowTopicOperationAsync(
+ eq(topicName), eq(TopicOperation.CONSUME),
+ eq(CLIENT_PRINCIPAL), argThat(arg -> {
+ assertTrue(arg instanceof AuthenticationDataSubscription);
+ try {
+ assertEquals(arg.getCommandData(), clientAuthenticationToken.getAuthData().getCommandData());
+ } catch (PulsarClientException e) {
+ fail(e.getMessage());
+ }
+ assertEquals(arg.getSubscription(), subscriptionName);
+ return true;
+ }));
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index cffc82bcf73..1efc2576a2e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1605,7 +1605,7 @@ public class ServerCnxTest {
channel.close().get();
}
serverCnx = new ServerCnx(pulsar);
- serverCnx.authRole = "";
+ serverCnx.setAuthRole("");
channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(
MaxMessageSize,
0,