You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/12 19:35:54 UTC
[incubator-pulsar] branch master updated: Add pluggable
authorization mechanism (#1200)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fb1c61d Add pluggable authorization mechanism (#1200)
fb1c61d is described below
commit fb1c61d1520fa89f98a7c4582129bc39d42c7d8e
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Feb 12 11:35:51 2018 -0800
Add pluggable authorization mechanism (#1200)
* Add pluggable authorization service
fix: move FutureUtils to common and fix import
add grantpermission api
take default auth method
pass authData to authorization provider
keep single authorization provider
* fix rebase change
---
conf/broker.conf | 3 +
conf/discovery.conf | 3 +
conf/proxy.conf | 3 +
conf/standalone.conf | 3 +
conf/websocket.conf | 3 +
.../apache/pulsar/broker/ServiceConfiguration.java | 11 +
.../authorization/AuthorizationProvider.java | 120 ++++
.../broker/authorization/AuthorizationService.java | 265 ++++++++
...nager.java => PulsarAuthorizationProvider.java} | 200 ++++--
.../broker/cache/ConfigurationCacheService.java | 4 +
.../org/apache/pulsar/broker/PulsarService.java | 2 +-
.../org/apache/pulsar/broker/admin/Namespaces.java | 46 +-
.../pulsar/broker/admin/NonPersistentTopics.java | 2 +-
.../pulsar/broker/admin/PersistentTopics.java | 12 +-
.../broker/loadbalance/impl/LoadManagerShared.java | 2 +-
.../pulsar/broker/lookup/DestinationLookup.java | 5 +-
.../pulsar/broker/namespace/OwnershipCache.java | 2 +-
.../pulsar/broker/service/BacklogQuotaManager.java | 2 +-
.../pulsar/broker/service/BrokerService.java | 14 +-
.../org/apache/pulsar/broker/service/Consumer.java | 8 +-
.../org/apache/pulsar/broker/service/Producer.java | 8 +-
.../apache/pulsar/broker/service/ServerCnx.java | 37 +-
.../service/nonpersistent/NonPersistentTopic.java | 2 +-
.../broker/service/persistent/PersistentTopic.java | 2 +-
.../pulsar/broker/web/AuthenticationFilter.java | 5 +
.../pulsar/broker/web/PulsarWebResource.java | 16 +-
.../pulsar/broker/auth/AuthorizationTest.java | 152 ++---
.../broker/auth/MockedPulsarServiceBaseTest.java | 4 +
.../AntiAffinityNamespaceGroupTest.java | 5 +-
.../lookup/http/HttpDestinationLookupv2Test.java | 8 +-
.../pulsar/broker/service/BatchMessageTest.java | 2 +-
.../broker/service/PersistentFailoverE2ETest.java | 2 +-
.../broker/service/PersistentQueueE2ETest.java | 2 +-
.../pulsar/broker/service/ReplicatorTestBase.java | 2 +-
.../pulsar/broker/service/ServerCnxTest.java | 67 +-
.../broker/service/TopicTerminationTest.java | 2 +-
.../api/AuthorizationProducerConsumerTest.java | 384 ++++++++++++
.../pulsar/client/api/BrokerServiceLookupTest.java | 2 +-
.../client/api/SimpleProducerConsumerTest.java | 2 +-
.../client/impl/BrokerClientIntegrationTest.java | 116 ++--
.../websocket/proxy/ProxyAuthorizationTest.java | 28 +-
.../pulsar/client/admin/internal/BaseResource.java | 2 +-
.../clients/consumer/PulsarKafkaConsumer.java | 2 +-
.../pulsar/client/api/ClientConfiguration.java | 1 +
.../org/apache/pulsar/client/impl/ClientCnx.java | 4 +-
.../apache/pulsar/client/impl/ConsumerBase.java | 2 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 2 +-
.../org/apache/pulsar/client/impl/HttpClient.java | 18 +-
.../pulsar/client/impl/HttpLookupService.java | 7 +-
.../client/impl/PartitionedConsumerImpl.java | 2 +-
.../client/impl/PartitionedProducerImpl.java | 2 +-
.../pulsar/client/impl/PulsarClientImpl.java | 4 +-
.../apache/pulsar/common/api/proto/PulsarApi.java | 680 ++++++++++-----------
.../org/apache/pulsar/common}/util/FutureUtil.java | 10 +-
.../discovery/service/BrokerDiscoveryProvider.java | 10 +-
.../pulsar/discovery/service/DiscoveryService.java | 10 +-
.../pulsar/discovery/service/ServerConnection.java | 9 +-
.../discovery/service/server/ServiceConfig.java | 11 +
.../discovery/service/DiscoveryServiceTest.java | 4 +-
.../proxy/server/BrokerDiscoveryProvider.java | 13 +-
.../pulsar/proxy/server/LookupProxyHandler.java | 3 +-
.../pulsar/proxy/server/ProxyConfiguration.java | 13 +-
.../pulsar/proxy/server/ProxyConnection.java | 7 +-
.../apache/pulsar/proxy/server/ProxyService.java | 10 +-
.../pulsar/testclient/PerformanceReader.java | 2 +-
.../pulsar/websocket/AbstractWebSocketHandler.java | 7 +-
.../apache/pulsar/websocket/ConsumerHandler.java | 6 +-
.../apache/pulsar/websocket/ProducerHandler.java | 11 +-
.../org/apache/pulsar/websocket/ReaderHandler.java | 6 +-
.../apache/pulsar/websocket/WebSocketService.java | 12 +-
.../websocket/admin/WebSocketWebResource.java | 18 +-
.../service/WebSocketProxyConfiguration.java | 13 +
72 files changed, 1682 insertions(+), 777 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 609cddd..d18ebac 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -203,6 +203,9 @@ authenticationProviders=
# Enforce authorization
authorizationEnabled=false
+# Authorization provider fully qualified class-name
+authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
+
# Allow wildcard matching in authorization
# (wildcard matching only applicable if wildcard-char:
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
diff --git a/conf/discovery.conf b/conf/discovery.conf
index 7dc3f63..49f499a 100644
--- a/conf/discovery.conf
+++ b/conf/discovery.conf
@@ -52,6 +52,9 @@ authenticationProviders=
# Enforce authorization
authorizationEnabled=false
+# Authorization provider name list, which is comma separated list of class names
+authorizationProviders=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
+
# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics (comma-separated)
superUserRoles=
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 78ad925..d7c5afc 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -49,6 +49,9 @@ authenticationProviders=
# Enforce authorization
authorizationEnabled=false
+# Authorization provider fully qualified class-name
+authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
+
# Authentication settings of the proxy itself. Used to connect to brokers
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
diff --git a/conf/standalone.conf b/conf/standalone.conf
index d2533e1..1201dce 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -169,6 +169,9 @@ authenticationProviders=
# Enforce authorization
authorizationEnabled=false
+# Authorization provider fully qualified class-name
+authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
+
# Allow wildcard matching in authorization
# (wildcard matching only applicable if wildcard-char:
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
diff --git a/conf/websocket.conf b/conf/websocket.conf
index cf5135d..404bdef 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -59,6 +59,9 @@ authenticationProviders=
# Enforce authorization
authorizationEnabled=false
+# Authorization provider fully qualified class-name
+authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
+
# Allow wildcard matching in authorization
# (wildcard matching only applicable if wildcard-char:
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index de27fd9..74077f4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -24,6 +24,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
@@ -188,6 +189,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Enforce authorization
private boolean authorizationEnabled = false;
+ // Authorization provider fully qualified class-name
+ private String authorizationProvider = PulsarAuthorizationProvider.class.getName();
// Role names that are treated as "super-user", meaning they will be able to
// do all admin operations and publish/consume from all topics
@@ -783,6 +786,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
this.authenticationEnabled = authenticationEnabled;
}
+ public String getAuthorizationProvider() {
+ return authorizationProvider;
+ }
+
+ public void setAuthorizationProvider(String authorizationProvider) {
+ this.authorizationProvider = authorizationProvider;
+ }
+
public void setAuthenticationProviders(Set<String> providersClassNames) {
authenticationProviders = providersClassNames;
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
new file mode 100644
index 0000000..9962c05
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+
+/**
+ * Provider of authorization mechanism
+ */
+public interface AuthorizationProvider extends Closeable {
+
+ /**
+ * Perform initialization for the authorization provider
+ *
+ * @param config
+ * broker config object
+ * @param configCache
+ * pulsar zk configuration cache service
+ * @throws IOException
+ * if the initialization fails
+ */
+ void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException;
+
+ /**
+ * Check if the specified role has permission to send messages to the specified fully qualified destination name.
+ *
+ * @param destination
+ * the fully qualified destination name associated with the destination.
+ * @param role
+ * the app id used to send messages to the destination.
+ */
+ CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData);
+
+ /**
+ * Check if the specified role has permission to receive messages from the specified fully qualified destination
+ * name.
+ *
+ * @param destination
+ * the fully qualified destination name associated with the destination.
+ * @param role
+ * the app id used to receive messages from the destination.
+ * @param subscription
+ * the subscription name defined by the client
+ */
+ CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData, String subscription);
+
+ /**
+ * Check whether the specified role can perform a lookup for the specified destination.
+ *
+ * For that the caller needs to have producer or consumer permission.
+ *
+ * @param destination
+ * @param role
+ * @return
+ * @throws Exception
+ */
+ CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData);
+
+ /**
+ *
+ * Grant authorization-action permission on a namespace to the given client
+ *
+ * @param namespace
+ * @param actions
+ * @param role
+ * @param authDataJson
+ * additional authdata in json format
+ * @return CompletableFuture
+ * @completesWith <br/>
+ * IllegalArgumentException when namespace not found<br/>
+ * IllegalStateException when failed to grant permission
+ */
+ CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
+ String authDataJson);
+
+ /**
+ * Grant authorization-action permission on a topic to the given client
+ *
+ * @param topicname
+ * @param role
+ * @param authDataJson
+ * additional authdata in json format
+ * @return CompletableFuture
+ * @completesWith <br/>
+ * IllegalArgumentException when namespace not found<br/>
+ * IllegalStateException when failed to grant permission
+ */
+ CompletableFuture<Void> grantPermissionAsync(DestinationName topicname, Set<AuthAction> actions, String role,
+ String authDataJson);
+
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
new file mode 100644
index 0000000..8cb9232
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Authorization service that manages pluggable authorization provider and authorize requests accordingly.
+ *
+ */
+public class AuthorizationService {
+ private static final Logger log = LoggerFactory.getLogger(AuthorizationService.class);
+
+ private AuthorizationProvider provider;
+ private final ServiceConfiguration conf;
+
+ public AuthorizationService(ServiceConfiguration conf, ConfigurationCacheService configCache)
+ throws PulsarServerException {
+
+ this.conf = conf;
+ if (this.conf.isAuthorizationEnabled()) {
+ try {
+ final String providerClassname = conf.getAuthorizationProvider();
+ if(StringUtils.isNotBlank(providerClassname)) {
+ provider = (AuthorizationProvider) Class.forName(providerClassname).newInstance();
+ provider.initialize(conf, configCache);
+ log.info("{} has been loaded.", providerClassname);
+ } else {
+ throw new PulsarServerException("No authorization providers are present.");
+ }
+ } catch (PulsarServerException e) {
+ throw e;
+ }catch (Throwable e) {
+ throw new PulsarServerException("Failed to load an authorization provider.", e);
+ }
+ } else {
+ log.info("Authorization is disabled");
+ }
+ }
+
+ /**
+ *
+ * Grant authorization-action permission on a namespace to the given client
+ *
+ * @param namespace
+ * @param actions
+ * @param role
+ * @param authDataJson
+ * additional authdata in json for targeted authorization provider
+ * @return
+ * @throws IllegalArgumentException
+ * when namespace not found
+ * @throws IllegalStateException
+ * when failed to grant permission
+ */
+ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
+ String authDataJson) {
+
+ if (provider != null) {
+ return provider.grantPermissionAsync(namespace, actions, role, authDataJson);
+ }
+ return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured"));
+ }
+
+ /**
+ * Grant authorization-action permission on a topic to the given client
+ *
+ * @param topicname
+ * @param role
+ * @param authDataJson
+ * additional authdata in json for targeted authorization provider
+ * @return IllegalArgumentException when namespace not found
+ * @throws IllegalStateException
+ * when failed to grant permission
+ */
+ public CompletableFuture<Void> grantPermissionAsync(DestinationName topicname, Set<AuthAction> actions, String role,
+ String authDataJson) {
+
+ if (provider != null) {
+ return provider.grantPermissionAsync(topicname, actions, role, authDataJson);
+ }
+ return FutureUtil
+ .failedFuture(new IllegalStateException("No authorization provider configured"));
+
+ }
+
+ /**
+ * Check if the specified role has permission to send messages to the specified fully qualified destination name.
+ *
+ * @param destination
+ * the fully qualified destination name associated with the destination.
+ * @param role
+ * the app id used to send messages to the destination.
+ */
+ public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData) {
+
+ if (!this.conf.isAuthorizationEnabled()) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ if (provider != null) {
+ return provider.canProduceAsync(destination, role, authenticationData);
+ }
+ return FutureUtil
+ .failedFuture(new IllegalStateException("No authorization provider configured"));
+ }
+
+ /**
+ * Check if the specified role has permission to receive messages from the specified fully qualified destination
+ * name.
+ *
+ * @param destination
+ * the fully qualified destination name associated with the destination.
+ * @param role
+ * the app id used to receive messages from the destination.
+ * @param subscription
+ * the subscription name defined by the client
+ */
+ public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData, String subscription) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return CompletableFuture.completedFuture(true);
+ }
+ if (provider != null) {
+ return provider.canConsumeAsync(destination, role, authenticationData, subscription);
+ }
+ return FutureUtil
+ .failedFuture(new IllegalStateException("No authorization provider configured"));
+ }
+
+ public boolean canProduce(DestinationName destination, String role, AuthenticationDataSource authenticationData) throws Exception {
+ try {
+ return canProduceAsync(destination, role, authenticationData).get(cacheTimeOutInSec,
+ SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
+ throw e;
+ } catch (Exception e) {
+ log.warn("Producer-client with Role - {} failed to get permissions for destination - {}. {}", role,
+ destination, e.getMessage());
+ throw e;
+ }
+ }
+
+ public boolean canConsume(DestinationName destination, String role, AuthenticationDataSource authenticationData,
+ String subscription) throws Exception {
+ try {
+ return canConsumeAsync(destination, role, authenticationData, subscription)
+ .get(cacheTimeOutInSec, SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
+ throw e;
+ } catch (Exception e) {
+ log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}. {}", role,
+ destination, e.getMessage());
+ throw e;
+ }
+ }
+
+ /**
+ * Check whether the specified role can perform a lookup for the specified destination.
+ *
+ * For that the caller needs to have producer or consumer permission.
+ *
+ * @param destination
+ * @param role
+ * @return
+ * @throws Exception
+ */
+ public boolean canLookup(DestinationName destination, String role, AuthenticationDataSource authenticationData) throws Exception {
+ return canProduce(destination, role, authenticationData)
+ || canConsume(destination, role, authenticationData, null);
+ }
+
+ /**
+ * Check whether the specified role can perform a lookup for the specified destination.
+ *
+ * For that the caller needs to have producer or consumer permission.
+ *
+ * @param destination
+ * @param role
+ * @return
+ * @throws Exception
+ */
+ public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData) {
+ CompletableFuture<Boolean> finalResult = new CompletableFuture<Boolean>();
+ canProduceAsync(destination, role, authenticationData)
+ .whenComplete((produceAuthorized, ex) -> {
+ if (ex == null) {
+ if (produceAuthorized) {
+ finalResult.complete(produceAuthorized);
+ return;
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Destination [{}] Role [{}] exception occured while trying to check Produce permissions. {}",
+ destination.toString(), role, ex.getMessage());
+ }
+ }
+ canConsumeAsync(destination, role, null, null)
+ .whenComplete((consumeAuthorized, e) -> {
+ if (e == null) {
+ if (consumeAuthorized) {
+ finalResult.complete(consumeAuthorized);
+ return;
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Destination [{}] Role [{}] exception occured while trying to check Consume permissions. {}",
+ destination.toString(), role, e.getMessage());
+
+ }
+ finalResult.completeExceptionally(e);
+ return;
+ }
+ finalResult.complete(false);
+ });
+ });
+ return finalResult;
+ }
+
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
similarity index 63%
rename from pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
rename to pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 0950ae2..dc0ba83 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -18,36 +18,59 @@
*/
package org.apache.pulsar.broker.authorization;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+
+import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.Policies;
+import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
+ * Default authorization provider that stores authorization policies under local-zookeeper.
+ *
*/
-public class AuthorizationManager {
- private static final Logger log = LoggerFactory.getLogger(AuthorizationManager.class);
+public class PulsarAuthorizationProvider implements AuthorizationProvider {
+ private static final Logger log = LoggerFactory.getLogger(PulsarAuthorizationProvider.class);
- public final ServiceConfiguration conf;
- public final ConfigurationCacheService configCache;
+ public ServiceConfiguration conf;
+ public ConfigurationCacheService configCache;
private static final String POLICY_ROOT = "/admin/policies/";
+ private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
+
+ public PulsarAuthorizationProvider() {
+ }
+
+ public PulsarAuthorizationProvider(ServiceConfiguration conf, ConfigurationCacheService configCache)
+ throws IOException {
+ initialize(conf, configCache);
+ }
- public AuthorizationManager(ServiceConfiguration conf, ConfigurationCacheService configCache) {
+ @Override
+ public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+ checkNotNull(conf, "ServiceConfiguration can't be null");
+ checkNotNull(configCache, "ConfigurationCacheService can't be null");
this.conf = conf;
this.configCache = configCache;
+
}
/**
@@ -58,23 +81,12 @@ public class AuthorizationManager {
* @param role
* the app id used to send messages to the destination.
*/
- public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role) {
+ @Override
+ public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData) {
return checkAuthorization(destination, role, AuthAction.produce);
}
- public boolean canProduce(DestinationName destination, String role) throws Exception {
- try {
- return canProduceAsync(destination, role).get(cacheTimeOutInSec, SECONDS);
- } catch (InterruptedException e) {
- log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
- throw e;
- } catch (Exception e) {
- log.warn("Producer-client with Role - {} failed to get permissions for destination - {}. {}", role,
- destination, e.getMessage());
- throw e;
- }
- }
-
/**
* Check if the specified role has permission to receive messages from the specified fully qualified destination
* name.
@@ -86,7 +98,9 @@ public class AuthorizationManager {
* @param subscription
* the subscription name defined by the client
*/
- public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role, String subscription) {
+ @Override
+ public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData, String subscription) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
configCache.policiesCache().getAsync(POLICY_ROOT + destination.getNamespace()).thenAccept(policies -> {
@@ -115,30 +129,19 @@ public class AuthorizationManager {
permissionFuture.complete(isAuthorized);
});
}).exceptionally(ex -> {
- log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination, ex.getMessage());
+ log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination,
+ ex.getMessage());
permissionFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
- log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination, e.getMessage());
+ log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination,
+ e.getMessage());
permissionFuture.completeExceptionally(e);
}
return permissionFuture;
}
- public boolean canConsume(DestinationName destination, String role, String subscription) throws Exception {
- try {
- return canConsumeAsync(destination, role, subscription).get(cacheTimeOutInSec, SECONDS);
- } catch (InterruptedException e) {
- log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
- throw e;
- } catch (Exception e) {
- log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}. {}", role,
- destination, e.getMessage());
- throw e;
- }
- }
-
/**
* Check whether the specified role can perform a lookup for the specified destination.
*
@@ -149,23 +152,11 @@ public class AuthorizationManager {
* @return
* @throws Exception
*/
- public boolean canLookup(DestinationName destination, String role) throws Exception {
- return canProduce(destination, role) || canConsume(destination, role, null);
- }
-
- /**
- * Check whether the specified role can perform a lookup for the specified destination.
- *
- * For that the caller needs to have producer or consumer permission.
- *
- * @param destination
- * @param role
- * @return
- * @throws Exception
- */
- public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role) {
+ @Override
+ public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData) {
CompletableFuture<Boolean> finalResult = new CompletableFuture<Boolean>();
- canProduceAsync(destination, role).whenComplete((produceAuthorized, ex) -> {
+ canProduceAsync(destination, role, authenticationData).whenComplete((produceAuthorized, ex) -> {
if (ex == null) {
if (produceAuthorized) {
finalResult.complete(produceAuthorized);
@@ -178,7 +169,7 @@ public class AuthorizationManager {
destination.toString(), role, ex.getMessage());
}
}
- canConsumeAsync(destination, role, null).whenComplete((consumeAuthorized, e) -> {
+ canConsumeAsync(destination, role, authenticationData, null).whenComplete((consumeAuthorized, e) -> {
if (e == null) {
if (consumeAuthorized) {
finalResult.complete(consumeAuthorized);
@@ -200,6 +191,61 @@ public class AuthorizationManager {
return finalResult;
}
+ @Override
+ public CompletableFuture<Void> grantPermissionAsync(DestinationName destination, Set<AuthAction> actions,
+ String role, String authDataJson) {
+ return grantPermissionAsync(destination.getNamespaceObject(), actions, role, authDataJson);
+ }
+
+ @Override
+ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespaceName, Set<AuthAction> actions,
+ String role, String authDataJson) {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+
+ try {
+ validatePoliciesReadOnlyAccess();
+ } catch (Exception e) {
+ result.completeExceptionally(e);
+ }
+
+ ZooKeeper globalZk = configCache.getZooKeeper();
+ final String property = namespaceName.getProperty();
+ final String cluster = namespaceName.getCluster();
+ final String namespace = namespaceName.getLocalName();
+ final String policiesPath = String.format("/%s/%s/%s/%s/%s", "admin", POLICIES, property, cluster, namespace);
+
+ try {
+ Stat nodeStat = new Stat();
+ byte[] content = globalZk.getData(policiesPath, null, nodeStat);
+ Policies policies = getThreadLocal().readValue(content, Policies.class);
+ policies.auth_policies.namespace_auth.put(role, actions);
+
+ // Write back the new policies into zookeeper
+ globalZk.setData(policiesPath, getThreadLocal().writeValueAsBytes(policies), nodeStat.getVersion());
+
+ configCache.policiesCache().invalidate(policiesPath);
+
+ log.info("[{}] Successfully granted access for role {}: {} - namespace {}/{}/{}", role, role, actions,
+ property, cluster, namespace);
+ result.complete(null);
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: does not exist", role, property, cluster,
+ namespace);
+ result.completeExceptionally(new IllegalArgumentException("Namespace does not exist" + namespace));
+ } catch (KeeperException.BadVersionException e) {
+ log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: concurrent modification", role, property,
+ cluster, namespace);
+ result.completeExceptionally(new IllegalStateException(
+ "Concurrent modification on zk path: " + policiesPath + ", " + e.getMessage()));
+ } catch (Exception e) {
+ log.error("[{}] Failed to get permissions for namespace {}/{}/{}", role, property, cluster, namespace, e);
+ result.completeExceptionally(
+ new IllegalStateException("Failed to get permissions for namespace " + namespace));
+ }
+
+ return result;
+ }
+
private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role, AuthAction action) {
if (isSuperUser(role)) {
return CompletableFuture.completedFuture(true);
@@ -273,7 +319,8 @@ public class AuthorizationManager {
return null;
});
} catch (Exception e) {
- log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination, e.getMessage());
+ log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination,
+ e.getMessage());
permissionFuture.completeExceptionally(e);
}
return permissionFuture;
@@ -312,4 +359,39 @@ public class AuthorizationManager {
return role != null && superUserRoles.contains(role) ? true : false;
}
+ @Override
+ public void close() throws IOException {
+ // No-op
+ }
+
+ private void validatePoliciesReadOnlyAccess() {
+ boolean arePoliciesReadOnly = true;
+ ZooKeeperCache globalZkCache = configCache.cache();
+
+ try {
+ arePoliciesReadOnly = globalZkCache.exists(POLICIES_READONLY_FLAG_PATH);
+ } catch (Exception e) {
+ log.warn("Unable to fetch contents of [{}] from global zookeeper", POLICIES_READONLY_FLAG_PATH, e);
+ throw new IllegalStateException("Unable to fetch content from global zk");
+ }
+
+ if (arePoliciesReadOnly) {
+ if (log.isDebugEnabled()) {
+ log.debug("Policies are read-only. Broker cannot do read-write operations");
+ }
+ throw new IllegalStateException("policies are in readonly mode");
+ } else {
+ // Make sure the broker is connected to the global zookeeper before writing. If not, throw an exception.
+ if (globalZkCache.getZooKeeper().getState() != States.CONNECTED) {
+ if (log.isDebugEnabled()) {
+ log.debug("Broker is not connected to the global zookeeper");
+ }
+ throw new IllegalStateException("not connected woith global zookeeper");
+ } else {
+ // Do nothing, just log the message.
+ log.debug("Broker is allowed to make read-write operations");
+ }
+ }
+ }
+
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
index 41a8f93..4ee2865 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
@@ -164,6 +164,10 @@ public class ConfigurationCacheService {
}
+ public ZooKeeperCache cache() {
+ return cache;
+ }
+
public ZooKeeperDataCache<PropertyAdmin> propertiesCache() {
return this.propertiesCache;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6cd56ab..51e6ace 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -53,12 +53,12 @@ import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.web.WebService;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
index cb8f982..e2467f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
@@ -27,7 +27,6 @@ import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_PO
import java.net.URI;
import java.net.URL;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -36,6 +35,7 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
@@ -62,7 +62,6 @@ import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -79,6 +78,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
@@ -451,34 +451,30 @@ public class Namespaces extends AdminResource {
public void grantPermissionOnNamespace(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("role") String role, Set<AuthAction> actions) {
validateAdminAccessOnProperty(property);
- validatePoliciesReadOnlyAccess();
+ NamespaceName namespaceName = NamespaceName.get(property, cluster, namespace);
try {
- Stat nodeStat = new Stat();
- byte[] content = globalZk().getData(path(POLICIES, property, cluster, namespace), null, nodeStat);
- Policies policies = jsonMapper().readValue(content, Policies.class);
- policies.auth_policies.namespace_auth.put(role, actions);
-
- // Write back the new policies into zookeeper
- globalZk().setData(path(POLICIES, property, cluster, namespace), jsonMapper().writeValueAsBytes(policies),
- nodeStat.getVersion());
-
- policiesCache().invalidate(path(POLICIES, property, cluster, namespace));
-
- log.info("[{}] Successfully granted access for role {}: {} - namespace {}/{}/{}", clientAppId(), role,
- actions, property, cluster, namespace);
- } catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: does not exist", clientAppId(), property,
- cluster, namespace);
- throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
- } catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: concurrent modification", clientAppId(),
- property, cluster, namespace);
- throw new RestException(Status.CONFLICT, "Concurrent modification");
- } catch (Exception e) {
+ pulsar().getBrokerService().getAuthorizationService()
+ .grantPermissionAsync(namespaceName, actions, role, null/*additional auth-data json*/)
+ .get();
+ } catch (InterruptedException e) {
log.error("[{}] Failed to get permissions for namespace {}/{}/{}", clientAppId(), property, cluster,
namespace, e);
throw new RestException(e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IllegalArgumentException) {
+ log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: does not exist", clientAppId(),
+ property, cluster, namespace);
+ throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+ } else if (e.getCause() instanceof IllegalStateException) {
+ log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: concurrent modification",
+ clientAppId(), property, cluster, namespace);
+ throw new RestException(Status.CONFLICT, "Concurrent modification");
+ } else {
+ log.error("[{}] Failed to get permissions for namespace {}/{}/{}", clientAppId(), property, cluster,
+ namespace, e);
+ throw new RestException(e);
+ }
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java
index f765934..68b4e83 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java
@@ -42,7 +42,6 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationDomain;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -51,6 +50,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
index a3b246e..484a873 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
@@ -58,15 +58,14 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
-import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
@@ -84,7 +83,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedExc
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -101,8 +99,8 @@ import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -271,7 +269,7 @@ public class PersistentTopics extends AdminResource {
validateAdminAccessOnProperty(destination.getProperty());
} catch (Exception ve) {
try {
- checkAuthorization(pulsar(), destination, clientAppId());
+ checkAuthorization(pulsar(), destination, clientAppId(), clientAuthData());
} catch (RestException re) {
throw re;
} catch (Exception e) {
@@ -1366,12 +1364,12 @@ public class PersistentTopics extends AdminResource {
}
public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsar,
- String clientAppId, DestinationName dn) {
+ String clientAppId, AuthenticationDataSource authenticationData, DestinationName dn) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
// (1) authorize client
try {
- checkAuthorization(pulsar, dn, clientAppId);
+ checkAuthorization(pulsar, dn, clientAppId, authenticationData);
} catch (RestException e) {
try {
validateAdminAccessOnProperty(pulsar, clientAppId, dn.getProperty());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 2658a95..19d3f34 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -42,11 +42,11 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/DestinationLookup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/DestinationLookup.java
index a666166..0393d6a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/DestinationLookup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/DestinationLookup.java
@@ -42,6 +42,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.web.NoSwaggerDocumentation;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
@@ -198,7 +199,7 @@ public class DestinationLookup extends PulsarWebResource {
* @return
*/
public static CompletableFuture<ByteBuf> lookupDestinationAsync(PulsarService pulsarService, DestinationName fqdn, boolean authoritative,
- String clientAppId, long requestId) {
+ String clientAppId, AuthenticationDataSource authenticationData, long requestId) {
final CompletableFuture<ByteBuf> validationFuture = new CompletableFuture<>();
final CompletableFuture<ByteBuf> lookupfuture = new CompletableFuture<>();
@@ -217,7 +218,7 @@ public class DestinationLookup extends PulsarWebResource {
} else {
// (2) authorize client
try {
- checkAuthorization(pulsarService, fqdn, clientAppId);
+ checkAuthorization(pulsarService, fqdn, clientAppId, authenticationData);
} catch (RestException authException) {
log.warn("Failed to authorized {} on cluster {}", clientAppId, fqdn.toString());
validationFuture.complete(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 48c7e71..e681fe7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -29,10 +29,10 @@ import java.util.concurrent.Executor;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 3223a77..556a1f8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -27,12 +27,12 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 74ca915..2f3b27a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -63,7 +63,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
@@ -83,7 +83,6 @@ import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.naming.DestinationDomain;
import org.apache.pulsar.common.naming.DestinationName;
@@ -99,6 +98,7 @@ import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -153,7 +153,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private final ConcurrentLinkedQueue<Pair<String, CompletableFuture<Topic>>> pendingTopicLoadingQueue;
- private AuthorizationManager authorizationManager = null;
+ private AuthorizationService authorizationService = null;
private final ScheduledExecutorService statsUpdater;
private final ScheduledExecutorService backlogQuotaChecker;
@@ -210,7 +210,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
this.statsUpdater = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
if (pulsar.getConfiguration().isAuthorizationEnabled()) {
- this.authorizationManager = new AuthorizationManager(pulsar.getConfiguration(),
+ this.authorizationService = new AuthorizationService(pulsar.getConfiguration(),
pulsar.getConfigurationCache());
}
@@ -896,8 +896,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
return result;
}
- public AuthorizationManager getAuthorizationManager() {
- return authorizationManager;
+ public AuthorizationService getAuthorizationService() {
+ return authorizationService;
}
public void removeTopicFromCache(String topic) {
@@ -983,7 +983,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
public boolean isAuthorizationEnabled() {
- return authorizationManager != null;
+ return authorizationService != null;
}
public int getKeepAliveIntervalSeconds() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 77c36e6..fd77ef2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
@@ -65,6 +66,7 @@ public class Consumer {
private final SubType subType;
private final ServerCnx cnx;
private final String appId;
+ private AuthenticationDataSource authenticationData;
private final String topicName;
private final long consumerId;
@@ -116,6 +118,7 @@ public class Consumer {
this.msgOut = new Rate();
this.msgRedeliver = new Rate();
this.appId = appId;
+ this.authenticationData = cnx.authenticationData;
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
MESSAGE_PERMITS_UPDATER.set(this, 0);
UNACKED_MESSAGES_UPDATER.set(this, 0);
@@ -464,9 +467,10 @@ public class Consumer {
public void checkPermissions() {
DestinationName destination = DestinationName.get(subscription.getDestination());
- if (cnx.getBrokerService().getAuthorizationManager() != null) {
+ if (cnx.getBrokerService().getAuthorizationService() != null) {
try {
- if (cnx.getBrokerService().getAuthorizationManager().canConsume(destination, appId, subscription.getName())) {
+ if (cnx.getBrokerService().getAuthorizationService().canConsume(destination, appId, authenticationData,
+ subscription.getName())) {
return;
}
} catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index e964598..c0de7a6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.util.Rate;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.Topic.PublishContext;
@@ -64,6 +65,7 @@ public class Producer {
private Rate msgIn;
// it records msg-drop rate only for non-persistent topic
private final Rate msgDrop;
+ private AuthenticationDataSource authenticationData;
private volatile long pendingPublishAcks = 0;
private static final AtomicLongFieldUpdater<Producer> pendingPublishAcksUpdater = AtomicLongFieldUpdater
@@ -88,6 +90,7 @@ public class Producer {
this.producerName = checkNotNull(producerName);
this.closeFuture = new CompletableFuture<>();
this.appId = appId;
+ this.authenticationData = cnx.authenticationData;
this.msgIn = new Rate();
this.isNonPersistentTopic = topic instanceof NonPersistentTopic;
this.msgDrop = this.isNonPersistentTopic ? new Rate() : null;
@@ -466,9 +469,10 @@ public class Producer {
public void checkPermissions() {
DestinationName destination = DestinationName.get(topic.getName());
- if (cnx.getBrokerService().getAuthorizationManager() != null) {
+ if (cnx.getBrokerService().getAuthorizationService() != null) {
try {
- if (cnx.getBrokerService().getAuthorizationManager().canProduce(destination, appId)) {
+ if (cnx.getBrokerService().getAuthorizationService().canProduce(destination, appId,
+ authenticationData)) {
return;
}
} catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 05fb749..b50eee6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.web.RestException;
@@ -95,6 +96,7 @@ public class ServerCnx extends PulsarHandler {
private State state;
private volatile boolean isActive = true;
String authRole = null;
+ AuthenticationDataSource authenticationData;
// Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
// control done by a single producer might not be enough to prevent write spikes on the broker.
@@ -244,16 +246,16 @@ public class ServerCnx extends PulsarHandler {
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationManager()
- .canLookupAsync(topicName, authRole);
+ isProxyAuthorizedFuture = service.getAuthorizationService().canLookupAsync(topicName, authRole,
+ authenticationData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
String finalOriginalPrincipal = originalPrincipal;
isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
if (isProxyAuthorized) {
- lookupDestinationAsync(getBrokerService().pulsar(), topicName,
- lookup.getAuthoritative(), finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole,
+ lookupDestinationAsync(getBrokerService().pulsar(), topicName, lookup.getAuthoritative(),
+ finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, authenticationData,
lookup.getRequestId()).handle((lookupResponse, ex) -> {
if (ex == null) {
ctx.writeAndFlush(lookupResponse);
@@ -331,8 +333,8 @@ public class ServerCnx extends PulsarHandler {
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationManager()
- .canLookupAsync(topicName, authRole);
+ isProxyAuthorizedFuture = service.getAuthorizationService()
+ .canLookupAsync(topicName, authRole, authenticationData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
@@ -340,8 +342,8 @@ public class ServerCnx extends PulsarHandler {
isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
if (isProxyAuthorized) {
getPartitionedTopicMetadata(getBrokerService().pulsar(),
- finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, topicName)
- .handle((metadata, ex) -> {
+ finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, authenticationData,
+ topicName).handle((metadata, ex) -> {
if (ex == null) {
int partitions = metadata.partitions;
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
@@ -497,8 +499,9 @@ public class ServerCnx extends PulsarHandler {
connect.hasOriginalAuthMethod() ? connect.getOriginalAuthMethod() : null,
connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null,
sslSession);
+ authenticationData = new AuthenticationDataCommand(authData, remoteAddress, sslSession);
authRole = getBrokerService().getAuthenticationService()
- .authenticate(new AuthenticationDataCommand(authData, remoteAddress, sslSession), authMethod);
+ .authenticate(authenticationData, authMethod);
log.info("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", remoteAddress, authMethod, authRole, originalPrincipal);
} catch (AuthenticationException e) {
@@ -554,8 +557,8 @@ public class ServerCnx extends PulsarHandler {
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationManager().canConsumeAsync(topicName, authRole,
- subscribe.getSubscription());
+ isProxyAuthorizedFuture = service.getAuthorizationService().canConsumeAsync(topicName, authRole,
+ authenticationData, subscribe.getSubscription());
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
@@ -563,8 +566,9 @@ public class ServerCnx extends PulsarHandler {
if (isProxyAuthorized) {
CompletableFuture<Boolean> authorizationFuture;
if (service.isAuthorizationEnabled()) {
- authorizationFuture = service.getAuthorizationManager().canConsumeAsync(topicName,
- originalPrincipal != null ? originalPrincipal : authRole, subscriptionName);
+ authorizationFuture = service.getAuthorizationService().canConsumeAsync(topicName,
+ originalPrincipal != null ? originalPrincipal : authRole, authenticationData,
+ subscribe.getSubscription());
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
@@ -711,7 +715,8 @@ public class ServerCnx extends PulsarHandler {
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationManager().canProduceAsync(topicName, authRole);
+ isProxyAuthorizedFuture = service.getAuthorizationService().canProduceAsync(topicName,
+ authRole, authenticationData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
@@ -719,8 +724,8 @@ public class ServerCnx extends PulsarHandler {
if (isProxyAuthorized) {
CompletableFuture<Boolean> authorizationFuture;
if (service.isAuthorizationEnabled()) {
- authorizationFuture = service.getAuthorizationManager().canProduceAsync(topicName,
- originalPrincipal != null ? originalPrincipal : authRole);
+ authorizationFuture = service.getAuthorizationService().canProduceAsync(topicName,
+ originalPrincipal != null ? originalPrincipal : authRole, authenticationData);
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 5119edc..44a9e14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -56,7 +56,6 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -69,6 +68,7 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 57d1d75..20ae527 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -79,7 +79,6 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -94,6 +93,7 @@ import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.compaction.CompactedTopic;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
index d55716d..a384e35 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
@@ -31,6 +31,8 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +46,7 @@ public class AuthenticationFilter implements Filter {
private final AuthenticationService authenticationService;
public static final String AuthenticatedRoleAttributeName = AuthenticationFilter.class.getName() + "-role";
+ public static final String AuthenticatedDataAttributeName = AuthenticationFilter.class.getName() + "-data";
public AuthenticationFilter(PulsarService pulsar) {
this.authenticationService = pulsar.getBrokerService().getAuthenticationService();
@@ -56,6 +59,8 @@ public class AuthenticationFilter implements Filter {
try {
String role = authenticationService.authenticateHttpRequest((HttpServletRequest) request);
request.setAttribute(AuthenticatedRoleAttributeName, role);
+ request.setAttribute(AuthenticatedDataAttributeName,
+ new AuthenticationDataHttps((HttpServletRequest) request));
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authenticated HTTP request with role {}", request.getRemoteAddr(), role);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 40b7d03..c17182c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.web;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
@@ -41,11 +42,12 @@ import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
-import static org.apache.commons.lang3.StringUtils.isBlank;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.Namespaces;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -127,6 +129,10 @@ public abstract class PulsarWebResource {
return (String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName);
}
+ public AuthenticationDataHttps clientAuthData() {
+ return (AuthenticationDataHttps) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedDataAttributeName);
+ }
+
public boolean isRequestHttps() {
return "https".equalsIgnoreCase(httpRequest.getScheme());
}
@@ -635,17 +641,17 @@ public abstract class PulsarWebResource {
}
protected void checkConnect(DestinationName destination) throws RestException, Exception {
- checkAuthorization(pulsar(), destination, clientAppId());
+ checkAuthorization(pulsar(), destination, clientAppId(), clientAuthData());
}
- protected static void checkAuthorization(PulsarService pulsarService, DestinationName destination, String role)
- throws RestException, Exception {
+ protected static void checkAuthorization(PulsarService pulsarService, DestinationName destination, String role,
+ AuthenticationDataSource authenticationData) throws RestException, Exception {
if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
// No enforcing of authorization policies
return;
}
// get zk policy manager
- if (!pulsarService.getBrokerService().getAuthorizationManager().canLookup(destination, role)) {
+ if (!pulsarService.getBrokerService().getAuthorizationService().canLookup(destination, role, authenticationData)) {
log.warn("[{}] Role {} is not allowed to lookup topic", destination, role);
throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace");
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index cee4bbe..69bfb35 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -23,7 +23,7 @@ import static org.testng.Assert.fail;
import java.util.EnumSet;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -60,9 +60,9 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
@Test
void simple() throws Exception {
- AuthorizationManager auth = pulsar.getBrokerService().getAuthorizationManager();
+ AuthorizationService auth = pulsar.getBrokerService().getAuthorizationService();
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), false);
admin.clusters().updateCluster("c1", new ClusterData());
admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1")));
@@ -70,69 +70,69 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
admin.namespaces().createNamespace("p1/c1/ns1");
waitForChange();
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), false);
admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my-role", EnumSet.of(AuthAction.produce));
waitForChange();
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
admin.persistentTopics().grantPermission("persistent://p1/c1/ns1/ds2", "other-role",
EnumSet.of(AuthAction.consume));
waitForChange();
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), true);
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), false);
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), true);
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), true);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), false);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, null), true);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null, null), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "no-access-role"), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "no-access-role", null), false);
admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my-role", EnumSet.allOf(AuthAction.class));
waitForChange();
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null), true);
// test for wildcard
// namespace prefix match
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), false);
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2", null), false);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null, null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2", null), false);
admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my.role.*", EnumSet.of(AuthAction.produce));
waitForChange();
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), true);
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true);
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), true);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2", null), true);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), true);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null, null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2", null), false);
// namespace suffix match
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), false);
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my", null), false);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null, null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "*.role.my", EnumSet.of(AuthAction.consume));
waitForChange();
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), true);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), true);
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), true);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), true);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my", null), true);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null, null), true);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
// revoke for next test
admin.namespaces().revokePermissionsOnNamespace("p1/c1/ns1", "my.role.*");
@@ -140,50 +140,50 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
waitForChange();
// destination prefix match
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), false);
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.1"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.2"), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2", null), false);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null, null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.1", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.2", null), false);
admin.persistentTopics().grantPermission("persistent://p1/c1/ns1/ds1", "my.*",
EnumSet.of(AuthAction.produce));
waitForChange();
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), true);
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true);
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.1"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.2"), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), true);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2", null), true);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), true);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null, null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.1", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.2", null), false);
// destination suffix match
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), false);
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "1.role.my"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "2.role.my"), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my", null), false);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null, null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "1.role.my", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "2.role.my", null), false);
admin.persistentTopics().grantPermission("persistent://p1/c1/ns1/ds1", "*.my",
EnumSet.of(AuthAction.consume));
waitForChange();
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), true);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), true);
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), true);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "1.role.my"), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "2.role.my"), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), true);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my", null), true);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null, null), true);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "1.role.my", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "2.role.my", null), false);
admin.persistentTopics().revokePermissions("persistent://p1/c1/ns1/ds1", "my.*");
admin.persistentTopics().revokePermissions("persistent://p1/c1/ns1/ds1", "*.my");
@@ -193,19 +193,19 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
admin.namespaces().setSubscriptionAuthMode("p1/c1/ns1", SubscriptionAuthMode.Prefix);
waitForChange();
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1"), true);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2"), true);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1", null), true);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2", null), true);
try {
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1", "sub1"), false);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "sub1"), false);
fail();
} catch (Exception e) {}
try {
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2", "sub2"), false);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "sub2"), false);
fail();
} catch (Exception e) {}
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1", "role1-sub1"), true);
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2", "role2-sub2"), true);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "role1-sub1"), true);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "role2-sub2"), true);
admin.namespaces().deleteNamespace("p1/c1/ns1");
admin.properties().deleteProperty("p1");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index c23d726..75b91c1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -176,7 +176,11 @@ public abstract class MockedPulsarServiceBaseTest {
PulsarService pulsar = spy(new PulsarService(conf));
setupBrokerMocks(pulsar);
+ boolean isAuthorizationEnabled = conf.isAuthorizationEnabled();
+ // enable authrorization to initialize authorization service which is used by grant-permission
+ conf.setAuthorizationEnabled(true);
pulsar.start();
+ conf.setAuthorizationEnabled(isAuthorizationEnabled);
return pulsar;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index fb44cbf..e7161b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -125,12 +125,13 @@ public class AntiAffinityNamespaceGroupTest {
config1.setBrokerServicePort(PRIMARY_BROKER_PORT);
config1.setFailureDomainsEnabled(true);
config1.setLoadBalancerEnabled(true);
+ config1.setAdvertisedAddress("localhost");
createCluster(bkEnsemble.getZkClient(), config1);
pulsar1 = new PulsarService(config1);
pulsar1.start();
- primaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(), PRIMARY_BROKER_WEBSERVICE_PORT);
+ primaryHost = String.format("%s:%d", "localhost", PRIMARY_BROKER_WEBSERVICE_PORT);
url1 = new URL("http://127.0.0.1" + ":" + PRIMARY_BROKER_WEBSERVICE_PORT);
admin1 = new PulsarAdmin(url1, (Authentication) null);
@@ -143,7 +144,7 @@ public class AntiAffinityNamespaceGroupTest {
config2.setBrokerServicePort(SECONDARY_BROKER_PORT);
config2.setFailureDomainsEnabled(true);
pulsar2 = new PulsarService(config2);
- secondaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(),
+ secondaryHost = String.format("%s:%d", "localhost",
SECONDARY_BROKER_WEBSERVICE_PORT);
pulsar2.start();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java
index 12e2c66..645fff9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java
@@ -41,7 +41,7 @@ import javax.ws.rs.core.UriInfo;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.lookup.DestinationLookup;
import org.apache.pulsar.broker.lookup.NamespaceData;
@@ -70,7 +70,7 @@ public class HttpDestinationLookupv2Test {
private PulsarService pulsar;
private NamespaceService ns;
- private AuthorizationManager auth;
+ private AuthorizationService auth;
private ServiceConfiguration config;
private ConfigurationCacheService mockConfigCache;
private ZooKeeperChildrenCache clustersListCache;
@@ -83,7 +83,7 @@ public class HttpDestinationLookupv2Test {
public void setUp() throws Exception {
pulsar = mock(PulsarService.class);
ns = mock(NamespaceService.class);
- auth = mock(AuthorizationManager.class);
+ auth = mock(AuthorizationService.class);
mockConfigCache = mock(ConfigurationCacheService.class);
clustersListCache = mock(ZooKeeperChildrenCache.class);
clustersCache = mock(ZooKeeperDataCache.class);
@@ -112,7 +112,7 @@ public class HttpDestinationLookupv2Test {
doReturn(ns).when(pulsar).getNamespaceService();
BrokerService brokerService = mock(BrokerService.class);
doReturn(brokerService).when(pulsar).getBrokerService();
- doReturn(auth).when(brokerService).getAuthorizationManager();
+ doReturn(auth).when(brokerService).getAuthorizationService();
doReturn(new Semaphore(1000)).when(brokerService).getLookupRequestSemaphore();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 6b9509c..b34e7d3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 3385c7a..c172b01 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -42,9 +42,9 @@ import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index 64567d2..7a65cbc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -45,11 +45,11 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.util.FutureUtil;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 8b2c474..344ad23 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -43,10 +43,10 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.slf4j.Logger;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index bc0e561..0a5c154 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -38,6 +38,7 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -61,14 +62,14 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.Commands.ChecksumType;
@@ -428,9 +429,10 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testProducerCommandWithAuthorizationPositive() throws Exception {
- AuthorizationManager authorizationManager = mock(AuthorizationManager.class);
- doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).canProduceAsync(Mockito.any(), Mockito.any());
- doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
+ AuthorizationService authorizationService = mock(AuthorizationService.class);
+ doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).canProduceAsync(Mockito.any(),
+ Mockito.any(), Mockito.any());
+ doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
resetChannel();
setChannelConnected();
@@ -459,10 +461,15 @@ public class ServerCnxTest {
doReturn(zkDataCache).when(configCacheService).policiesCache();
doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(matches(".*nonexistent.*"));
- AuthorizationManager authorizationManager = spy(new AuthorizationManager(svcConfig, configCacheService));
- doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
+ AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, configCacheService));
+ doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthorizationEnabled();
- doReturn(false).when(authorizationManager).isSuperUser(Mockito.anyString());
+ svcConfig.setAuthorizationEnabled(true);
+ Field providerField = AuthorizationService.class.getDeclaredField("provider");
+ providerField.setAccessible(true);
+ PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
+ providerField.set(authorizationService, authorizationProvider);
+ doReturn(false).when(authorizationProvider).isSuperUser(Mockito.anyString());
// Test producer creation
resetChannel();
@@ -484,11 +491,16 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testClusterAccess() throws Exception {
- AuthorizationManager authorizationManager = spy(new AuthorizationManager(svcConfig, configCacheService));
- doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
+ svcConfig.setAuthorizationEnabled(true);
+ AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, configCacheService));
+ Field providerField = AuthorizationService.class.getDeclaredField("provider");
+ providerField.setAccessible(true);
+ PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
+ providerField.set(authorizationService, authorizationProvider);
+ doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthorizationEnabled();
- doReturn(false).when(authorizationManager).isSuperUser(Mockito.anyString());
- doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).checkPermission(any(DestinationName.class), Mockito.anyString(),
+ doReturn(false).when(authorizationProvider).isSuperUser(Mockito.anyString());
+ doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).checkPermission(any(DestinationName.class), Mockito.anyString(),
any(AuthAction.class));
resetChannel();
@@ -508,10 +520,14 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testNonExistentTopicSuperUserAccess() throws Exception {
- AuthorizationManager authorizationManager = spy(new AuthorizationManager(svcConfig, configCacheService));
- doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
+ AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, configCacheService));
+ doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthorizationEnabled();
- doReturn(true).when(authorizationManager).isSuperUser(Mockito.anyString());
+ Field providerField = AuthorizationService.class.getDeclaredField("provider");
+ providerField.setAccessible(true);
+ PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
+ providerField.set(authorizationService, authorizationProvider);
+ doReturn(true).when(authorizationProvider).isSuperUser(Mockito.anyString());
// Test producer creation
resetChannel();
@@ -540,9 +556,10 @@ public class ServerCnxTest {
}
public void testProducerCommandWithAuthorizationNegative() throws Exception {
- AuthorizationManager authorizationManager = mock(AuthorizationManager.class);
- doReturn(CompletableFuture.completedFuture(false)).when(authorizationManager).canProduceAsync(Mockito.any(), Mockito.any());
- doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
+ AuthorizationService authorizationService = mock(AuthorizationService.class);
+ doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).canProduceAsync(Mockito.any(),
+ Mockito.any(), Mockito.any());
+ doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
doReturn("prod1").when(brokerService).generateUniqueProducerName();
@@ -1127,9 +1144,10 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
- AuthorizationManager authorizationManager = mock(AuthorizationManager.class);
- doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any(), Mockito.any());
- doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
+ AuthorizationService authorizationService = mock(AuthorizationService.class);
+ doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).canConsumeAsync(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any());
+ doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
resetChannel();
@@ -1147,9 +1165,10 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testSubscribeCommandWithAuthorizationNegative() throws Exception {
- AuthorizationManager authorizationManager = mock(AuthorizationManager.class);
- doReturn(CompletableFuture.completedFuture(false)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any(), Mockito.any());
- doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
+ AuthorizationService authorizationService = mock(AuthorizationService.class);
+ doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).canConsumeAsync(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any());
+ doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index aefd645..bcc5470 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -44,7 +44,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.api.ReaderListener;
-import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
new file mode 100644
index 0000000..4f4fcd6
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import javax.naming.AuthenticationException;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authorization.AuthorizationProvider;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
+ private static final Logger log = LoggerFactory.getLogger(AuthorizationProducerConsumerTest.class);
+
+ private final static String clientRole = "plugbleRole";
+
+ protected void setup() throws Exception {
+
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(true);
+
+ Set<String> superUserRoles = new HashSet<>();
+ superUserRoles.add("superUser");
+ conf.setSuperUserRoles(superUserRoles);
+
+ conf.setBrokerClientAuthenticationPlugin(TestAuthenticationProvider.class.getName());
+
+ Set<String> providers = new HashSet<>();
+ providers.add(TestAuthenticationProvider.class.getName());
+ conf.setAuthenticationProviders(providers);
+
+ conf.setClusterName("use");
+
+ super.init();
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ /**
+ * It verifies plugable authorization service
+ *
+ * <pre>
+ * 1. Client passes correct authorization plugin-name + correct auth role: SUCCESS
+ * 2. Client passes correct authorization plugin-name + incorrect auth-role: FAIL
+ * 3. Client passes incorrect authorization plugin-name + correct auth-role: FAIL
+ * </pre>
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testProducerAndConsumerAuthorization() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ conf.setAuthorizationProvider(TestAuthorizationProvider.class.getName());
+ setup();
+
+ ClientConfiguration adminConf = new ClientConfiguration();
+ Authentication adminAuthentication = new ClientAuthentication("superUser");
+ adminConf.setAuthentication(adminAuthentication);
+ admin = spy(new PulsarAdmin(brokerUrl, adminConf));
+
+ String lookupUrl;
+ lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
+
+ ClientConfiguration clientConfValid = new ClientConfiguration();
+ Authentication authentication = new ClientAuthentication(clientRole);
+ clientConfValid.setAuthentication(authentication);
+
+ ClientConfiguration clientConfInvalidRole = new ClientConfiguration();
+ Authentication authenticationInvalidRole = new ClientAuthentication("test-role");
+ clientConfInvalidRole.setAuthentication(authenticationInvalidRole);
+
+ pulsarClient = PulsarClient.create(lookupUrl, clientConfValid);
+ PulsarClient pulsarClientInvalidRole = PulsarClient.create(lookupUrl, clientConfInvalidRole);
+
+ admin.properties().createProperty("my-property",
+ new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+ admin.namespaces().createNamespace("my-property/use/my-ns");
+
+ // (1) Valid Producer and consumer creation
+ Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic", "my-subscriber-name");
+ Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic");
+ consumer.close();
+ producer.close();
+
+ // (2) InValid user auth-role will be rejected by authorization service
+ try {
+ consumer = pulsarClientInvalidRole.subscribe("persistent://my-property/use/my-ns/my-topic",
+ "my-subscriber-name");
+ Assert.fail("should have failed with authorization error");
+ } catch (PulsarClientException.AuthorizationException pa) {
+ // Ok
+ }
+ try {
+ producer = pulsarClientInvalidRole.createProducer("persistent://my-property/use/my-ns/my-topic");
+ Assert.fail("should have failed with authorization error");
+ } catch (PulsarClientException.AuthorizationException pa) {
+ // Ok
+ }
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ @Test
+ public void testGrantPermission() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ conf.setAuthorizationProvider(TestAuthorizationProviderWithGrantPermission.class.getName());
+ setup();
+
+ AuthorizationService authorizationService = new AuthorizationService(conf, null);
+ DestinationName destination = DestinationName.get("persistent://prop/cluster/ns/t1");
+ String role = "test-role";
+ Assert.assertFalse(authorizationService.canProduce(destination, role, null));
+ Assert.assertFalse(authorizationService.canConsume(destination, role, null, "sub1"));
+ authorizationService
+ .grantPermissionAsync(destination, null, role, "auth-json").get();
+ Assert.assertTrue(authorizationService.canProduce(destination, role, null));
+ Assert.assertTrue(authorizationService.canConsume(destination, role, null, "sub1"));
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ @Test
+ public void testAuthData() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ conf.setAuthorizationProvider(TestAuthorizationProviderWithGrantPermission.class.getName());
+ setup();
+
+ AuthorizationService authorizationService = new AuthorizationService(conf, null);
+ DestinationName destination = DestinationName.get("persistent://prop/cluster/ns/t1");
+ String role = "test-role";
+ authorizationService
+ .grantPermissionAsync(destination, null, role, "auth-json")
+ .get();
+ Assert.assertEquals(TestAuthorizationProviderWithGrantPermission.authDataJson, "auth-json");
+ Assert.assertTrue(
+ authorizationService.canProduce(destination, role, new AuthenticationDataCommand("prod-auth")));
+ Assert.assertEquals(TestAuthorizationProviderWithGrantPermission.authenticationData.getCommandData(),
+ "prod-auth");
+ Assert.assertTrue(authorizationService.canConsume(destination, role, new AuthenticationDataCommand("cons-auth"),
+ "sub1"));
+ Assert.assertEquals(TestAuthorizationProviderWithGrantPermission.authenticationData.getCommandData(),
+ "cons-auth");
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ public static class ClientAuthentication implements Authentication {
+ String user;
+
+ public ClientAuthentication(String user) {
+ this.user = user;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // No-op
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "test";
+ }
+
+ @Override
+ public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+ AuthenticationDataProvider provider = new AuthenticationDataProvider() {
+ public boolean hasDataForHttp() {
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Set<Map.Entry<String, String>> getHttpHeaders() {
+ return Sets.newHashSet(Maps.immutableEntry("user", user));
+ }
+
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ public String getCommandData() {
+ return user;
+ }
+ };
+ return provider;
+ }
+
+ @Override
+ public void configure(Map<String, String> authParams) {
+ // No-op
+ }
+
+ @Override
+ public void start() throws PulsarClientException {
+ // No-op
+ }
+
+ }
+
+ public static class TestAuthenticationProvider implements AuthenticationProvider {
+
+ @Override
+ public void close() throws IOException {
+ // no-op
+ }
+
+ @Override
+ public void initialize(ServiceConfiguration config) throws IOException {
+ // No-op
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "test";
+ }
+
+ @Override
+ public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+ return authData.getCommandData() != null ? authData.getCommandData() : authData.getHttpHeader("user");
+ }
+
+ }
+
+ public static class TestAuthorizationProvider implements AuthorizationProvider {
+
+ @Override
+ public void close() throws IOException {
+ // No-op
+ }
+
+ @Override
+ public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+ // No-op
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData) {
+ return CompletableFuture.completedFuture(clientRole.equals(role));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData, String subscription) {
+ return CompletableFuture.completedFuture(clientRole.equals(role));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData) {
+ return CompletableFuture.completedFuture(clientRole.equals(role));
+ }
+
+ @Override
+ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
+ String role, String authenticationData) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> grantPermissionAsync(DestinationName topicname, Set<AuthAction> actions,
+ String role, String authenticationData) {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ /**
+ * This provider always fails authorization on consumer and passes on producer
+ *
+ */
+ public static class TestAuthorizationProvider2 extends TestAuthorizationProvider {
+
+ @Override
+ public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData, String subscription) {
+ return CompletableFuture.completedFuture(false);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData) {
+ return CompletableFuture.completedFuture(true);
+ }
+ }
+
+ public static class TestAuthorizationProviderWithGrantPermission extends TestAuthorizationProvider {
+
+ private Set<String> grantRoles = Sets.newHashSet();
+ static AuthenticationDataSource authenticationData;
+ static String authDataJson;
+
+ @Override
+ public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData) {
+ this.authenticationData = authenticationData;
+ return CompletableFuture.completedFuture(grantRoles.contains(role));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData, String subscription) {
+ this.authenticationData = authenticationData;
+ return CompletableFuture.completedFuture(grantRoles.contains(role));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData) {
+ this.authenticationData = authenticationData;
+ return CompletableFuture.completedFuture(grantRoles.contains(role));
+ }
+
+ @Override
+ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
+ String role, String authData) {
+ this.authDataJson = authData;
+ grantRoles.add(role);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> grantPermissionAsync(DestinationName topicname, Set<AuthAction> actions,
+ String role, String authData) {
+ this.authDataJson = authData;
+ grantRoles.add(role);
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 458321c..80d2d00 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -250,7 +250,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// enable authorization: so, broker can validate cluster and redirect if finds different cluster
pulsar.getConfiguration().setAuthorizationEnabled(true);
- // restart broker with authorization enabled: it initialize AuthorizationManager
+ // restart broker with authorization enabled: it initialize AuthorizationService
stopBroker();
startBroker();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 4200d92..9635c43 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -56,9 +56,9 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 342e232..5454bb7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -47,7 +47,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.collect.Lists;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -63,12 +63,12 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.HandlerBase.State;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
@@ -81,6 +81,7 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Maps;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
public class BrokerClientIntegrationTest extends ProducerConsumerBase {
@@ -580,6 +581,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
int concurrentTopic = pulsar.getConfiguration().getMaxConcurrentTopicLoadRequest();
try {
+ pulsar.getConfiguration().setAuthorizationEnabled(false);
final int concurrentLookupRequests = 20;
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setMaxNumberOfRejectedRequestPerConnection(0);
@@ -632,78 +634,72 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
@Test(timeOut = 5000)
public void testCloseConnectionOnInternalServerError() throws Exception {
- try {
- final PulsarClient pulsarClient;
-
- final String topicName = "persistent://prop/usw/my-ns/newTopic";
+ final PulsarClient pulsarClient;
- ClientConfiguration clientConf = new ClientConfiguration();
- clientConf.setStatsInterval(0, TimeUnit.SECONDS);
- String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
- pulsarClient = PulsarClient.create(lookupUrl, clientConf);
+ final String topicName = "persistent://prop/usw/my-ns/newTopic";
- ProducerImpl producer = (ProducerImpl) pulsarClient.createProducer(topicName);
- ClientCnx cnx = producer.cnx();
- assertTrue(cnx.channel().isActive());
- // this will throw NPE at broker while authorizing and it will throw InternalServerError
- pulsar.getConfiguration().setAuthorizationEnabled(true);
- try {
- pulsarClient.createProducer(topicName);
- fail("it should have fail with lookup-exception:");
- } catch (Exception e) {
- // ok
- }
- // connection must be closed
- assertFalse(cnx.channel().isActive());
- pulsarClient.close();
- } finally {
- pulsar.getConfiguration().setAuthorizationEnabled(false);
+ ClientConfiguration clientConf = new ClientConfiguration();
+ clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+ String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
+ pulsarClient = PulsarClient.create(lookupUrl, clientConf);
+
+ ProducerImpl producer = (ProducerImpl) pulsarClient.createProducer(topicName);
+ ClientCnx cnx = producer.cnx();
+ assertTrue(cnx.channel().isActive());
+
+ // Need broker to throw InternalServerError. so, make global-zk unavailable
+ Field globalZkCacheField = PulsarService.class.getDeclaredField("globalZkCache");
+ globalZkCacheField.setAccessible(true);
+ globalZkCacheField.set(pulsar, null);
+
+ try {
+ pulsarClient.createProducer(topicName);
+ fail("it should have fail with lookup-exception:");
+ } catch (Exception e) {
+ // ok
}
+ // connection must be closed
+ assertFalse(cnx.channel().isActive());
+ pulsarClient.close();
}
@Test
public void testInvalidDynamicConfiguration() throws Exception {
+ // (1) try to update invalid loadManagerClass name
try {
- // (1) try to update invalid loadManagerClass name
- try {
- admin.brokers().updateDynamicConfiguration("loadManagerClassName",
- "org.apache.pulsar.invalid.loadmanager");
- fail("it should have failed due to invalid argument");
- } catch (PulsarAdminException e) {
- // Ok: should have failed due to invalid config value
- }
+ admin.brokers().updateDynamicConfiguration("loadManagerClassName",
+ "org.apache.pulsar.invalid.loadmanager");
+ fail("it should have failed due to invalid argument");
+ } catch (PulsarAdminException e) {
+ // Ok: should have failed due to invalid config value
+ }
- // (2) try to update with valid loadManagerClass name
- try {
- admin.brokers().updateDynamicConfiguration("loadManagerClassName",
- "org.apache.pulsar.broker.loadbalance.ModularLoadManager");
- } catch (PulsarAdminException e) {
- fail("it should have failed due to invalid argument", e);
- }
+ // (2) try to update with valid loadManagerClass name
+ try {
+ admin.brokers().updateDynamicConfiguration("loadManagerClassName",
+ "org.apache.pulsar.broker.loadbalance.ModularLoadManager");
+ } catch (PulsarAdminException e) {
+ fail("it should have failed due to invalid argument", e);
+ }
- // (3) restart broker with invalid config value
+ // (3) restart broker with invalid config value
- ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar.getBrokerService()
- .getDynamicConfigurationCache();
- Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
- .get();
- configurationMap.put("loadManagerClassName", "org.apache.pulsar.invalid.loadmanager");
- byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
- dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
- mockZookKeeper.setData(BROKER_SERVICE_CONFIGURATION_PATH, content, -1);
+ ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar.getBrokerService()
+ .getDynamicConfigurationCache();
+ Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
+ .get();
+ configurationMap.put("loadManagerClassName", "org.apache.pulsar.invalid.loadmanager");
+ byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
+ dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
+ mockZookKeeper.setData(BROKER_SERVICE_CONFIGURATION_PATH, content, -1);
- try {
- stopBroker();
- startBroker();
- fail("it should have failed due to invalid argument");
- } catch (Exception e) {
- // Ok: should have failed due to invalid config value
- }
- } finally {
- byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(Maps.newHashMap());
- mockZookKeeper.setData(BROKER_SERVICE_CONFIGURATION_PATH, content, -1);
+ try {
+ stopBroker();
startBroker();
+ fail("it should have failed due to invalid argument");
+ } catch (Exception e) {
+ // Ok: should have failed due to invalid config value
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index 302f2e4..943c62d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -27,7 +27,7 @@ import java.util.Set;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -78,9 +78,9 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
@Test
public void test() throws Exception {
- AuthorizationManager auth = service.getAuthorizationManager();
+ AuthorizationService auth = service.getAuthorizationService();
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), false);
admin.clusters().updateCluster(configClusterName, new ClusterData());
admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1")));
@@ -88,31 +88,31 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
admin.namespaces().createNamespace("p1/c1/ns1");
waitForChange();
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), false);
admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my-role", EnumSet.of(AuthAction.produce));
waitForChange();
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
admin.persistentTopics().grantPermission("persistent://p1/c1/ns1/ds2", "other-role",
EnumSet.of(AuthAction.consume));
waitForChange();
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), true);
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), false);
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), true);
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), true);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), false);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, null), true);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null, null), false);
- assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "no-access-role"), false);
+ assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "no-access-role", null), false);
admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my-role", EnumSet.allOf(AuthAction.class));
waitForChange();
- assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
- assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+ assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+ assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null), true);
admin.namespaces().deleteNamespace("p1/c1/ns1");
admin.properties().deleteProperty("p1");
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 1ef50e8..3ad4059 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -43,8 +43,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.admin.PulsarAdminException.ServerSideErrorException;
import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 97cde46..b5afd0e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -59,8 +59,8 @@ import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
import org.apache.pulsar.client.util.ConsumerName;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.util.FutureUtil;
public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
index 9e4aece..9ab7b32 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
@@ -145,6 +145,7 @@ public class ClientConfiguration implements Serializable {
this.authentication = AuthenticationFactory.create(authPluginClassName, authParams);
}
+
/**
* @return the operation timeout in ms
*/
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 38e96ed..9ab379d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -129,8 +129,8 @@ public class ClientCnx extends PulsarHandler {
authData = authentication.getAuthData().getCommandData();
}
// Send CONNECT command
- ctx.writeAndFlush(Commands.newConnect(authentication.getAuthMethodName(), authData, getPulsarClientVersion(),
- proxyToTargetBrokerAddress))
+ ctx.writeAndFlush(Commands.newConnect(authentication.getAuthMethodName(), authData,
+ getPulsarClientVersion(), proxyToTargetBrokerAddress))
.addListener(future -> {
if (future.isSuccess()) {
if (log.isDebugEnabled()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 6ef58b2..be5e658 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -36,9 +36,9 @@ import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.util.ConsumerName;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import com.google.common.collect.Queues;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 671929c..b74bb13 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -45,7 +45,6 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi;
@@ -57,6 +56,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index d4e1f54..7ad6e1a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -34,6 +34,7 @@ import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.ssl.SslContext;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -55,15 +56,16 @@ public class HttpClient implements Closeable {
protected final URL url;
protected final Authentication authentication;
- protected HttpClient(String serviceUrl, Authentication authentication, EventLoopGroup eventLoopGroup,
- boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath) throws PulsarClientException {
- this(serviceUrl, authentication, eventLoopGroup, tlsAllowInsecureConnection, tlsTrustCertsFilePath,
- DEFAULT_CONNECT_TIMEOUT_IN_SECONDS, DEFAULT_READ_TIMEOUT_IN_SECONDS);
+ protected HttpClient(String serviceUrl, Authentication authentication,
+ EventLoopGroup eventLoopGroup, boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath)
+ throws PulsarClientException {
+ this(serviceUrl, authentication, eventLoopGroup, tlsAllowInsecureConnection,
+ tlsTrustCertsFilePath, DEFAULT_CONNECT_TIMEOUT_IN_SECONDS, DEFAULT_READ_TIMEOUT_IN_SECONDS);
}
- protected HttpClient(String serviceUrl, Authentication authentication, EventLoopGroup eventLoopGroup,
- boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath, int connectTimeoutInSeconds,
- int readTimeoutInSeconds) throws PulsarClientException {
+ protected HttpClient(String serviceUrl, Authentication authentication,
+ EventLoopGroup eventLoopGroup, boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath,
+ int connectTimeoutInSeconds, int readTimeoutInSeconds) throws PulsarClientException {
this.authentication = authentication;
try {
// Ensure trailing "/" on url
@@ -129,7 +131,7 @@ public class HttpClient implements Closeable {
builder.setHeader(header.getKey(), header.getValue());
}
}
-
+
final ListenableFuture<Response> responseFuture = builder.setHeader("Accept", "application/json")
.execute(new AsyncCompletionHandler<Response>() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 8c12ab1..208a9b1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
@@ -26,10 +25,10 @@ import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,8 +42,8 @@ class HttpLookupService implements LookupService {
public HttpLookupService(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup)
throws PulsarClientException {
- this.httpClient = new HttpClient(serviceUrl, conf.getAuthentication(), eventLoopGroup,
- conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath());
+ this.httpClient = new HttpClient(serviceUrl, conf.getAuthentication(),
+ eventLoopGroup, conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath());
this.useTls = conf.isUseTls();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 1dd3e4b..60e3928 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -40,9 +40,9 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index fe9ae6c..68fda50 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -35,8 +35,8 @@ import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TopicMetadata;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 362cf6e..f6bd759 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -42,10 +42,10 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.util.ExecutorProvider;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationDomain;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -471,7 +471,7 @@ public class PulsarClientImpl implements PulsarClient {
DestinationName destinationName = DestinationName.get(topic);
metadataFuture = lookup.getPartitionedTopicMetadata(destinationName);
} catch (IllegalArgumentException e) {
- return FutureUtil.failedFuture(e);
+ return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage()));
}
return metadataFuture;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 2c13125..6bbe77e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -279,13 +279,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements MessageIdDataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use MessageIdData.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<MessageIdData> handle;
- private MessageIdData(io.netty.util.Recycler.Handle<MessageIdData> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private MessageIdData(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<MessageIdData> RECYCLER = new io.netty.util.Recycler<MessageIdData>() {
- protected MessageIdData newObject(Handle<MessageIdData> handle) {
+ protected MessageIdData newObject(Handle handle) {
return new MessageIdData(handle);
}
};
@@ -295,12 +295,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private MessageIdData(boolean noInit) {
- this.handle = null;
- }
+ private MessageIdData(boolean noInit) {}
private static final MessageIdData defaultInstance;
public static MessageIdData getDefaultInstance() {
@@ -508,20 +506,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.MessageIdDataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -780,13 +778,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements KeyValueOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use KeyValue.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<KeyValue> handle;
- private KeyValue(io.netty.util.Recycler.Handle<KeyValue> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private KeyValue(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<KeyValue> RECYCLER = new io.netty.util.Recycler<KeyValue>() {
- protected KeyValue newObject(Handle<KeyValue> handle) {
+ protected KeyValue newObject(Handle handle) {
return new KeyValue(handle);
}
};
@@ -796,12 +794,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private KeyValue(boolean noInit) {
- this.handle = null;
- }
+ private KeyValue(boolean noInit) {}
private static final KeyValue defaultInstance;
public static KeyValue getDefaultInstance() {
@@ -1017,20 +1013,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.KeyValue, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.KeyValueOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -1249,13 +1245,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements KeyLongValueOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use KeyLongValue.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<KeyLongValue> handle;
- private KeyLongValue(io.netty.util.Recycler.Handle<KeyLongValue> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private KeyLongValue(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<KeyLongValue> RECYCLER = new io.netty.util.Recycler<KeyLongValue>() {
- protected KeyLongValue newObject(Handle<KeyLongValue> handle) {
+ protected KeyLongValue newObject(Handle handle) {
return new KeyLongValue(handle);
}
};
@@ -1265,12 +1261,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private KeyLongValue(boolean noInit) {
- this.handle = null;
- }
+ private KeyLongValue(boolean noInit) {}
private static final KeyLongValue defaultInstance;
public static KeyLongValue getDefaultInstance() {
@@ -1464,20 +1458,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValueOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -1687,13 +1681,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements EncryptionKeysOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use EncryptionKeys.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<EncryptionKeys> handle;
- private EncryptionKeys(io.netty.util.Recycler.Handle<EncryptionKeys> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private EncryptionKeys(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<EncryptionKeys> RECYCLER = new io.netty.util.Recycler<EncryptionKeys>() {
- protected EncryptionKeys newObject(Handle<EncryptionKeys> handle) {
+ protected EncryptionKeys newObject(Handle handle) {
return new EncryptionKeys(handle);
}
};
@@ -1703,12 +1697,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private EncryptionKeys(boolean noInit) {
- this.handle = null;
- }
+ private EncryptionKeys(boolean noInit) {}
private static final EncryptionKeys defaultInstance;
public static EncryptionKeys getDefaultInstance() {
@@ -1937,20 +1929,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeysOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -2328,13 +2320,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements MessageMetadataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use MessageMetadata.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<MessageMetadata> handle;
- private MessageMetadata(io.netty.util.Recycler.Handle<MessageMetadata> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private MessageMetadata(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<MessageMetadata> RECYCLER = new io.netty.util.Recycler<MessageMetadata>() {
- protected MessageMetadata newObject(Handle<MessageMetadata> handle) {
+ protected MessageMetadata newObject(Handle handle) {
return new MessageMetadata(handle);
}
};
@@ -2344,12 +2336,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private MessageMetadata(boolean noInit) {
- this.handle = null;
- }
+ private MessageMetadata(boolean noInit) {}
private static final MessageMetadata defaultInstance;
public static MessageMetadata getDefaultInstance() {
@@ -2872,20 +2862,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -3784,13 +3774,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements SingleMessageMetadataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use SingleMessageMetadata.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<SingleMessageMetadata> handle;
- private SingleMessageMetadata(io.netty.util.Recycler.Handle<SingleMessageMetadata> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private SingleMessageMetadata(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<SingleMessageMetadata> RECYCLER = new io.netty.util.Recycler<SingleMessageMetadata>() {
- protected SingleMessageMetadata newObject(Handle<SingleMessageMetadata> handle) {
+ protected SingleMessageMetadata newObject(Handle handle) {
return new SingleMessageMetadata(handle);
}
};
@@ -3800,12 +3790,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private SingleMessageMetadata(boolean noInit) {
- this.handle = null;
- }
+ private SingleMessageMetadata(boolean noInit) {}
private static final SingleMessageMetadata defaultInstance;
public static SingleMessageMetadata getDefaultInstance() {
@@ -4030,20 +4018,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -4389,13 +4377,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandConnectOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandConnect.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandConnect> handle;
- private CommandConnect(io.netty.util.Recycler.Handle<CommandConnect> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandConnect(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandConnect> RECYCLER = new io.netty.util.Recycler<CommandConnect>() {
- protected CommandConnect newObject(Handle<CommandConnect> handle) {
+ protected CommandConnect newObject(Handle handle) {
return new CommandConnect(handle);
}
};
@@ -4405,12 +4393,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandConnect(boolean noInit) {
- this.handle = null;
- }
+ private CommandConnect(boolean noInit) {}
private static final CommandConnect defaultInstance;
public static CommandConnect getDefaultInstance() {
@@ -4836,20 +4822,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandConnectOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -5379,13 +5365,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandConnectedOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandConnected.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandConnected> handle;
- private CommandConnected(io.netty.util.Recycler.Handle<CommandConnected> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandConnected(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandConnected> RECYCLER = new io.netty.util.Recycler<CommandConnected>() {
- protected CommandConnected newObject(Handle<CommandConnected> handle) {
+ protected CommandConnected newObject(Handle handle) {
return new CommandConnected(handle);
}
};
@@ -5395,12 +5381,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandConnected(boolean noInit) {
- this.handle = null;
- }
+ private CommandConnected(boolean noInit) {}
private static final CommandConnected defaultInstance;
public static CommandConnected getDefaultInstance() {
@@ -5590,20 +5574,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandConnectedOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -5841,13 +5825,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandSubscribeOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandSubscribe.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandSubscribe> handle;
- private CommandSubscribe(io.netty.util.Recycler.Handle<CommandSubscribe> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandSubscribe(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandSubscribe> RECYCLER = new io.netty.util.Recycler<CommandSubscribe>() {
- protected CommandSubscribe newObject(Handle<CommandSubscribe> handle) {
+ protected CommandSubscribe newObject(Handle handle) {
return new CommandSubscribe(handle);
}
};
@@ -5857,12 +5841,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandSubscribe(boolean noInit) {
- this.handle = null;
- }
+ private CommandSubscribe(boolean noInit) {}
private static final CommandSubscribe defaultInstance;
public static CommandSubscribe getDefaultInstance() {
@@ -6341,20 +6323,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribeOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -7050,13 +7032,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandPartitionedTopicMetadataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandPartitionedTopicMetadata.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandPartitionedTopicMetadata> handle;
- private CommandPartitionedTopicMetadata(io.netty.util.Recycler.Handle<CommandPartitionedTopicMetadata> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandPartitionedTopicMetadata(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandPartitionedTopicMetadata> RECYCLER = new io.netty.util.Recycler<CommandPartitionedTopicMetadata>() {
- protected CommandPartitionedTopicMetadata newObject(Handle<CommandPartitionedTopicMetadata> handle) {
+ protected CommandPartitionedTopicMetadata newObject(Handle handle) {
return new CommandPartitionedTopicMetadata(handle);
}
};
@@ -7066,12 +7048,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandPartitionedTopicMetadata(boolean noInit) {
- this.handle = null;
- }
+ private CommandPartitionedTopicMetadata(boolean noInit) {}
private static final CommandPartitionedTopicMetadata defaultInstance;
public static CommandPartitionedTopicMetadata getDefaultInstance() {
@@ -7385,20 +7365,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -7764,13 +7744,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandPartitionedTopicMetadataResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandPartitionedTopicMetadataResponse.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandPartitionedTopicMetadataResponse> handle;
- private CommandPartitionedTopicMetadataResponse(io.netty.util.Recycler.Handle<CommandPartitionedTopicMetadataResponse> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandPartitionedTopicMetadataResponse(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandPartitionedTopicMetadataResponse> RECYCLER = new io.netty.util.Recycler<CommandPartitionedTopicMetadataResponse>() {
- protected CommandPartitionedTopicMetadataResponse newObject(Handle<CommandPartitionedTopicMetadataResponse> handle) {
+ protected CommandPartitionedTopicMetadataResponse newObject(Handle handle) {
return new CommandPartitionedTopicMetadataResponse(handle);
}
};
@@ -7780,12 +7760,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandPartitionedTopicMetadataResponse(boolean noInit) {
- this.handle = null;
- }
+ private CommandPartitionedTopicMetadataResponse(boolean noInit) {}
private static final CommandPartitionedTopicMetadataResponse defaultInstance;
public static CommandPartitionedTopicMetadataResponse getDefaultInstance() {
@@ -8070,20 +8048,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponse, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponse.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -8418,13 +8396,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandLookupTopicOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandLookupTopic.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandLookupTopic> handle;
- private CommandLookupTopic(io.netty.util.Recycler.Handle<CommandLookupTopic> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandLookupTopic(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandLookupTopic> RECYCLER = new io.netty.util.Recycler<CommandLookupTopic>() {
- protected CommandLookupTopic newObject(Handle<CommandLookupTopic> handle) {
+ protected CommandLookupTopic newObject(Handle handle) {
return new CommandLookupTopic(handle);
}
};
@@ -8434,12 +8412,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandLookupTopic(boolean noInit) {
- this.handle = null;
- }
+ private CommandLookupTopic(boolean noInit) {}
private static final CommandLookupTopic defaultInstance;
public static CommandLookupTopic getDefaultInstance() {
@@ -8771,20 +8747,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -9197,13 +9173,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandLookupTopicResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandLookupTopicResponse.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandLookupTopicResponse> handle;
- private CommandLookupTopicResponse(io.netty.util.Recycler.Handle<CommandLookupTopicResponse> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandLookupTopicResponse(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandLookupTopicResponse> RECYCLER = new io.netty.util.Recycler<CommandLookupTopicResponse>() {
- protected CommandLookupTopicResponse newObject(Handle<CommandLookupTopicResponse> handle) {
+ protected CommandLookupTopicResponse newObject(Handle handle) {
return new CommandLookupTopicResponse(handle);
}
};
@@ -9213,12 +9189,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandLookupTopicResponse(boolean noInit) {
- this.handle = null;
- }
+ private CommandLookupTopicResponse(boolean noInit) {}
private static final CommandLookupTopicResponse defaultInstance;
public static CommandLookupTopicResponse getDefaultInstance() {
@@ -9604,20 +9578,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -10089,13 +10063,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandProducerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandProducer.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandProducer> handle;
- private CommandProducer(io.netty.util.Recycler.Handle<CommandProducer> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandProducer(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandProducer> RECYCLER = new io.netty.util.Recycler<CommandProducer>() {
- protected CommandProducer newObject(Handle<CommandProducer> handle) {
+ protected CommandProducer newObject(Handle handle) {
return new CommandProducer(handle);
}
};
@@ -10105,12 +10079,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandProducer(boolean noInit) {
- this.handle = null;
- }
+ private CommandProducer(boolean noInit) {}
private static final CommandProducer defaultInstance;
public static CommandProducer getDefaultInstance() {
@@ -10419,20 +10391,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -10882,13 +10854,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandSendOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandSend.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandSend> handle;
- private CommandSend(io.netty.util.Recycler.Handle<CommandSend> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandSend(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandSend> RECYCLER = new io.netty.util.Recycler<CommandSend>() {
- protected CommandSend newObject(Handle<CommandSend> handle) {
+ protected CommandSend newObject(Handle handle) {
return new CommandSend(handle);
}
};
@@ -10898,12 +10870,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandSend(boolean noInit) {
- this.handle = null;
- }
+ private CommandSend(boolean noInit) {}
private static final CommandSend defaultInstance;
public static CommandSend getDefaultInstance() {
@@ -11093,20 +11063,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandSend, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandSendOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandSend.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -11334,13 +11304,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandSendReceiptOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandSendReceipt.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandSendReceipt> handle;
- private CommandSendReceipt(io.netty.util.Recycler.Handle<CommandSendReceipt> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandSendReceipt(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandSendReceipt> RECYCLER = new io.netty.util.Recycler<CommandSendReceipt>() {
- protected CommandSendReceipt newObject(Handle<CommandSendReceipt> handle) {
+ protected CommandSendReceipt newObject(Handle handle) {
return new CommandSendReceipt(handle);
}
};
@@ -11350,12 +11320,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandSendReceipt(boolean noInit) {
- this.handle = null;
- }
+ private CommandSendReceipt(boolean noInit) {}
private static final CommandSendReceipt defaultInstance;
public static CommandSendReceipt getDefaultInstance() {
@@ -11551,20 +11519,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceiptOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -11829,13 +11797,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandSendErrorOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandSendError.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandSendError> handle;
- private CommandSendError(io.netty.util.Recycler.Handle<CommandSendError> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandSendError(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandSendError> RECYCLER = new io.netty.util.Recycler<CommandSendError>() {
- protected CommandSendError newObject(Handle<CommandSendError> handle) {
+ protected CommandSendError newObject(Handle handle) {
return new CommandSendError(handle);
}
};
@@ -11845,12 +11813,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandSendError(boolean noInit) {
- this.handle = null;
- }
+ private CommandSendError(boolean noInit) {}
private static final CommandSendError defaultInstance;
public static CommandSendError getDefaultInstance() {
@@ -12088,20 +12054,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandSendErrorOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -12390,13 +12356,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandMessageOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandMessage.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandMessage> handle;
- private CommandMessage(io.netty.util.Recycler.Handle<CommandMessage> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandMessage(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandMessage> RECYCLER = new io.netty.util.Recycler<CommandMessage>() {
- protected CommandMessage newObject(Handle<CommandMessage> handle) {
+ protected CommandMessage newObject(Handle handle) {
return new CommandMessage(handle);
}
};
@@ -12406,12 +12372,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandMessage(boolean noInit) {
- this.handle = null;
- }
+ private CommandMessage(boolean noInit) {}
private static final CommandMessage defaultInstance;
public static CommandMessage getDefaultInstance() {
@@ -12587,20 +12551,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandMessageOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -12834,13 +12798,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandAckOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandAck.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandAck> handle;
- private CommandAck(io.netty.util.Recycler.Handle<CommandAck> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandAck(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandAck> RECYCLER = new io.netty.util.Recycler<CommandAck>() {
- protected CommandAck newObject(Handle<CommandAck> handle) {
+ protected CommandAck newObject(Handle handle) {
return new CommandAck(handle);
}
};
@@ -12850,12 +12814,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandAck(boolean noInit) {
- this.handle = null;
- }
+ private CommandAck(boolean noInit) {}
private static final CommandAck defaultInstance;
public static CommandAck getDefaultInstance() {
@@ -13197,20 +13159,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandAck, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandAckOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -13636,13 +13598,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandFlowOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandFlow.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandFlow> handle;
- private CommandFlow(io.netty.util.Recycler.Handle<CommandFlow> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandFlow(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandFlow> RECYCLER = new io.netty.util.Recycler<CommandFlow>() {
- protected CommandFlow newObject(Handle<CommandFlow> handle) {
+ protected CommandFlow newObject(Handle handle) {
return new CommandFlow(handle);
}
};
@@ -13652,12 +13614,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandFlow(boolean noInit) {
- this.handle = null;
- }
+ private CommandFlow(boolean noInit) {}
private static final CommandFlow defaultInstance;
public static CommandFlow getDefaultInstance() {
@@ -13829,20 +13789,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandFlowOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -14031,13 +13991,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandUnsubscribeOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandUnsubscribe.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandUnsubscribe> handle;
- private CommandUnsubscribe(io.netty.util.Recycler.Handle<CommandUnsubscribe> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandUnsubscribe(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandUnsubscribe> RECYCLER = new io.netty.util.Recycler<CommandUnsubscribe>() {
- protected CommandUnsubscribe newObject(Handle<CommandUnsubscribe> handle) {
+ protected CommandUnsubscribe newObject(Handle handle) {
return new CommandUnsubscribe(handle);
}
};
@@ -14047,12 +14007,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandUnsubscribe(boolean noInit) {
- this.handle = null;
- }
+ private CommandUnsubscribe(boolean noInit) {}
private static final CommandUnsubscribe defaultInstance;
public static CommandUnsubscribe getDefaultInstance() {
@@ -14224,20 +14182,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribeOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -14430,13 +14388,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandSeekOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandSeek.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandSeek> handle;
- private CommandSeek(io.netty.util.Recycler.Handle<CommandSeek> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandSeek(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandSeek> RECYCLER = new io.netty.util.Recycler<CommandSeek>() {
- protected CommandSeek newObject(Handle<CommandSeek> handle) {
+ protected CommandSeek newObject(Handle handle) {
return new CommandSeek(handle);
}
};
@@ -14446,12 +14404,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandSeek(boolean noInit) {
- this.handle = null;
- }
+ private CommandSeek(boolean noInit) {}
private static final CommandSeek defaultInstance;
public static CommandSeek getDefaultInstance() {
@@ -14647,20 +14603,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandSeekOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -14913,13 +14869,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandReachedEndOfTopicOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandReachedEndOfTopic.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandReachedEndOfTopic> handle;
- private CommandReachedEndOfTopic(io.netty.util.Recycler.Handle<CommandReachedEndOfTopic> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandReachedEndOfTopic(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandReachedEndOfTopic> RECYCLER = new io.netty.util.Recycler<CommandReachedEndOfTopic>() {
- protected CommandReachedEndOfTopic newObject(Handle<CommandReachedEndOfTopic> handle) {
+ protected CommandReachedEndOfTopic newObject(Handle handle) {
return new CommandReachedEndOfTopic(handle);
}
};
@@ -14929,12 +14885,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandReachedEndOfTopic(boolean noInit) {
- this.handle = null;
- }
+ private CommandReachedEndOfTopic(boolean noInit) {}
private static final CommandReachedEndOfTopic defaultInstance;
public static CommandReachedEndOfTopic getDefaultInstance() {
@@ -15084,20 +15038,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandReachedEndOfTopic, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandReachedEndOfTopicOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandReachedEndOfTopic.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -15247,13 +15201,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandCloseProducerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandCloseProducer.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandCloseProducer> handle;
- private CommandCloseProducer(io.netty.util.Recycler.Handle<CommandCloseProducer> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandCloseProducer(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandCloseProducer> RECYCLER = new io.netty.util.Recycler<CommandCloseProducer>() {
- protected CommandCloseProducer newObject(Handle<CommandCloseProducer> handle) {
+ protected CommandCloseProducer newObject(Handle handle) {
return new CommandCloseProducer(handle);
}
};
@@ -15263,12 +15217,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandCloseProducer(boolean noInit) {
- this.handle = null;
- }
+ private CommandCloseProducer(boolean noInit) {}
private static final CommandCloseProducer defaultInstance;
public static CommandCloseProducer getDefaultInstance() {
@@ -15440,20 +15392,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -15642,13 +15594,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandCloseConsumerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandCloseConsumer.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandCloseConsumer> handle;
- private CommandCloseConsumer(io.netty.util.Recycler.Handle<CommandCloseConsumer> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandCloseConsumer(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandCloseConsumer> RECYCLER = new io.netty.util.Recycler<CommandCloseConsumer>() {
- protected CommandCloseConsumer newObject(Handle<CommandCloseConsumer> handle) {
+ protected CommandCloseConsumer newObject(Handle handle) {
return new CommandCloseConsumer(handle);
}
};
@@ -15658,12 +15610,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandCloseConsumer(boolean noInit) {
- this.handle = null;
- }
+ private CommandCloseConsumer(boolean noInit) {}
private static final CommandCloseConsumer defaultInstance;
public static CommandCloseConsumer getDefaultInstance() {
@@ -15835,20 +15785,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -16039,13 +15989,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandRedeliverUnacknowledgedMessagesOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandRedeliverUnacknowledgedMessages.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandRedeliverUnacknowledgedMessages> handle;
- private CommandRedeliverUnacknowledgedMessages(io.netty.util.Recycler.Handle<CommandRedeliverUnacknowledgedMessages> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandRedeliverUnacknowledgedMessages(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandRedeliverUnacknowledgedMessages> RECYCLER = new io.netty.util.Recycler<CommandRedeliverUnacknowledgedMessages>() {
- protected CommandRedeliverUnacknowledgedMessages newObject(Handle<CommandRedeliverUnacknowledgedMessages> handle) {
+ protected CommandRedeliverUnacknowledgedMessages newObject(Handle handle) {
return new CommandRedeliverUnacknowledgedMessages(handle);
}
};
@@ -16055,12 +16005,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandRedeliverUnacknowledgedMessages(boolean noInit) {
- this.handle = null;
- }
+ private CommandRedeliverUnacknowledgedMessages(boolean noInit) {}
private static final CommandRedeliverUnacknowledgedMessages defaultInstance;
public static CommandRedeliverUnacknowledgedMessages getDefaultInstance() {
@@ -16245,20 +16193,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandRedeliverUnacknowledgedMessages, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandRedeliverUnacknowledgedMessagesOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandRedeliverUnacknowledgedMessages.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -16522,13 +16470,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandSuccessOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandSuccess.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandSuccess> handle;
- private CommandSuccess(io.netty.util.Recycler.Handle<CommandSuccess> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandSuccess(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandSuccess> RECYCLER = new io.netty.util.Recycler<CommandSuccess>() {
- protected CommandSuccess newObject(Handle<CommandSuccess> handle) {
+ protected CommandSuccess newObject(Handle handle) {
return new CommandSuccess(handle);
}
};
@@ -16538,12 +16486,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandSuccess(boolean noInit) {
- this.handle = null;
- }
+ private CommandSuccess(boolean noInit) {}
private static final CommandSuccess defaultInstance;
public static CommandSuccess getDefaultInstance() {
@@ -16693,20 +16639,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccessOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -16860,13 +16806,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandProducerSuccessOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandProducerSuccess.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandProducerSuccess> handle;
- private CommandProducerSuccess(io.netty.util.Recycler.Handle<CommandProducerSuccess> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandProducerSuccess(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandProducerSuccess> RECYCLER = new io.netty.util.Recycler<CommandProducerSuccess>() {
- protected CommandProducerSuccess newObject(Handle<CommandProducerSuccess> handle) {
+ protected CommandProducerSuccess newObject(Handle handle) {
return new CommandProducerSuccess(handle);
}
};
@@ -16876,12 +16822,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandProducerSuccess(boolean noInit) {
- this.handle = null;
- }
+ private CommandProducerSuccess(boolean noInit) {}
private static final CommandProducerSuccess defaultInstance;
public static CommandProducerSuccess getDefaultInstance() {
@@ -17093,20 +17037,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerSuccess, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerSuccessOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerSuccess.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -17349,13 +17293,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandErrorOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandError.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandError> handle;
- private CommandError(io.netty.util.Recycler.Handle<CommandError> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandError(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandError> RECYCLER = new io.netty.util.Recycler<CommandError>() {
- protected CommandError newObject(Handle<CommandError> handle) {
+ protected CommandError newObject(Handle handle) {
return new CommandError(handle);
}
};
@@ -17365,12 +17309,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandError(boolean noInit) {
- this.handle = null;
- }
+ private CommandError(boolean noInit) {}
private static final CommandError defaultInstance;
public static CommandError getDefaultInstance() {
@@ -17586,20 +17528,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandError, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandErrorOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandError.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -17841,13 +17783,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandPingOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandPing.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandPing> handle;
- private CommandPing(io.netty.util.Recycler.Handle<CommandPing> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandPing(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandPing> RECYCLER = new io.netty.util.Recycler<CommandPing>() {
- protected CommandPing newObject(Handle<CommandPing> handle) {
+ protected CommandPing newObject(Handle handle) {
return new CommandPing(handle);
}
};
@@ -17856,12 +17798,10 @@ public final class PulsarApi {
this.initFields();
this.memoizedIsInitialized = -1;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandPing(boolean noInit) {
- this.handle = null;
- }
+ private CommandPing(boolean noInit) {}
private static final CommandPing defaultInstance;
public static CommandPing getDefaultInstance() {
@@ -17988,20 +17928,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandPing, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandPingOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandPing.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -18100,13 +18040,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandPongOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandPong.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandPong> handle;
- private CommandPong(io.netty.util.Recycler.Handle<CommandPong> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandPong(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandPong> RECYCLER = new io.netty.util.Recycler<CommandPong>() {
- protected CommandPong newObject(Handle<CommandPong> handle) {
+ protected CommandPong newObject(Handle handle) {
return new CommandPong(handle);
}
};
@@ -18115,12 +18055,10 @@ public final class PulsarApi {
this.initFields();
this.memoizedIsInitialized = -1;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandPong(boolean noInit) {
- this.handle = null;
- }
+ private CommandPong(boolean noInit) {}
private static final CommandPong defaultInstance;
public static CommandPong getDefaultInstance() {
@@ -18247,20 +18185,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandPong, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandPongOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandPong.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -18367,13 +18305,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandConsumerStatsOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandConsumerStats.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandConsumerStats> handle;
- private CommandConsumerStats(io.netty.util.Recycler.Handle<CommandConsumerStats> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandConsumerStats(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandConsumerStats> RECYCLER = new io.netty.util.Recycler<CommandConsumerStats>() {
- protected CommandConsumerStats newObject(Handle<CommandConsumerStats> handle) {
+ protected CommandConsumerStats newObject(Handle handle) {
return new CommandConsumerStats(handle);
}
};
@@ -18383,12 +18321,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandConsumerStats(boolean noInit) {
- this.handle = null;
- }
+ private CommandConsumerStats(boolean noInit) {}
private static final CommandConsumerStats defaultInstance;
public static CommandConsumerStats getDefaultInstance() {
@@ -18560,20 +18496,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -18814,13 +18750,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements CommandConsumerStatsResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use CommandConsumerStatsResponse.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<CommandConsumerStatsResponse> handle;
- private CommandConsumerStatsResponse(io.netty.util.Recycler.Handle<CommandConsumerStatsResponse> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private CommandConsumerStatsResponse(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<CommandConsumerStatsResponse> RECYCLER = new io.netty.util.Recycler<CommandConsumerStatsResponse>() {
- protected CommandConsumerStatsResponse newObject(Handle<CommandConsumerStatsResponse> handle) {
+ protected CommandConsumerStatsResponse newObject(Handle handle) {
return new CommandConsumerStatsResponse(handle);
}
};
@@ -18830,12 +18766,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private CommandConsumerStatsResponse(boolean noInit) {
- this.handle = null;
- }
+ private CommandConsumerStatsResponse(boolean noInit) {}
private static final CommandConsumerStatsResponse defaultInstance;
public static CommandConsumerStatsResponse getDefaultInstance() {
@@ -19347,20 +19281,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
@@ -20186,13 +20120,13 @@ public final class PulsarApi {
com.google.protobuf.GeneratedMessageLite
implements BaseCommandOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
// Use BaseCommand.newBuilder() to construct.
- private final io.netty.util.Recycler.Handle<BaseCommand> handle;
- private BaseCommand(io.netty.util.Recycler.Handle<BaseCommand> handle) {
+ private io.netty.util.Recycler.Handle handle;
+ private BaseCommand(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
}
private static final io.netty.util.Recycler<BaseCommand> RECYCLER = new io.netty.util.Recycler<BaseCommand>() {
- protected BaseCommand newObject(Handle<BaseCommand> handle) {
+ protected BaseCommand newObject(Handle handle) {
return new BaseCommand(handle);
}
};
@@ -20202,12 +20136,10 @@ public final class PulsarApi {
this.memoizedIsInitialized = -1;
this.bitField0_ = 0;
this.memoizedSerializedSize = -1;
- handle.recycle(this);
+ if (handle != null) { RECYCLER.recycle(this, handle); }
}
- private BaseCommand(boolean noInit) {
- this.handle = null;
- }
+ private BaseCommand(boolean noInit) {}
private static final BaseCommand defaultInstance;
public static BaseCommand getDefaultInstance() {
@@ -21109,20 +21041,20 @@ public final class PulsarApi {
org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand, Builder>
implements org.apache.pulsar.common.api.proto.PulsarApi.BaseCommandOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
// Construct using org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.newBuilder()
- private final io.netty.util.Recycler.Handle<Builder> handle;
- private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
this.handle = handle;
maybeForceBuilderInitialization();
}
private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
- protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
return new Builder(handle);
}
};
public void recycle() {
clear();
- handle.recycle(this);
+ if (handle != null) {RECYCLER.recycle(this, handle);}
}
private void maybeForceBuilderInitialization() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
similarity index 89%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/util/FutureUtil.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 2f0490d..e07941a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -16,15 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.util;
+package org.apache.pulsar.common.util;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pulsar.client.api.PulsarClientException;
-
public class FutureUtil {
/**
@@ -63,11 +61,7 @@ public class FutureUtil {
public static <T> CompletableFuture<T> failedFuture(Throwable t) {
CompletableFuture<T> future = new CompletableFuture<>();
- if (t instanceof PulsarClientException) {
- future.completeExceptionally(t);
- } else {
- future.completeExceptionally(new PulsarClientException(t));
- }
+ future.completeExceptionally(t);
return future;
}
}
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index 63123f3..1951592 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -97,11 +98,11 @@ public class BrokerDiscoveryProvider implements Closeable {
}
CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(DiscoveryService service,
- DestinationName destination, String role) {
+ DestinationName destination, String role, AuthenticationDataSource authenticationData) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
- checkAuthorization(service, destination, role);
+ checkAuthorization(service, destination, role, authenticationData);
final String path = path(PARTITIONED_TOPIC_PATH_ZNODE, destination.getProperty(), destination.getCluster(),
destination.getNamespacePortion(), "persistent", destination.getEncodedLocalName());
// gets the number of partitions from the zk cache
@@ -126,7 +127,8 @@ public class BrokerDiscoveryProvider implements Closeable {
return metadataFuture;
}
- protected static void checkAuthorization(DiscoveryService service, DestinationName destination, String role)
+ protected static void checkAuthorization(DiscoveryService service, DestinationName destination, String role,
+ AuthenticationDataSource authenticationData)
throws Exception {
if (!service.getConfiguration().isAuthorizationEnabled()
|| service.getConfiguration().getSuperUserRoles().contains(role)) {
@@ -134,7 +136,7 @@ public class BrokerDiscoveryProvider implements Closeable {
return;
}
// get zk policy manager
- if (!service.getAuthorizationManager().canLookup(destination, role)) {
+ if (!service.getAuthorizationService().canLookup(destination, role, authenticationData)) {
LOG.warn("[{}] Role {} is not allowed to lookup topic", destination, role);
// check namespace authorization
PropertyAdmin propertyAdmin;
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
index 387ba7a..38ae7eb 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
@@ -26,7 +26,7 @@ import java.net.InetAddress;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -55,7 +55,7 @@ public class DiscoveryService implements Closeable {
private final String serviceUrlTls;
private ConfigurationCacheService configurationCacheService;
private AuthenticationService authenticationService;
- private AuthorizationManager authorizationManager;
+ private AuthorizationService authorizationService;
private ZooKeeperClientFactory zkClientFactory = null;
private BrokerDiscoveryProvider discoveryProvider;
private final EventLoopGroup acceptorGroup;
@@ -82,7 +82,7 @@ public class DiscoveryService implements Closeable {
this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache);
ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(config);
authenticationService = new AuthenticationService(serviceConfiguration);
- authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService);
+ authorizationService = new AuthorizationService(serviceConfiguration, configurationCacheService);
startServer();
}
@@ -181,8 +181,8 @@ public class DiscoveryService implements Closeable {
return authenticationService;
}
- public AuthorizationManager getAuthorizationManager() {
- return authorizationManager;
+ public AuthorizationService getAuthorizationService() {
+ return authorizationService;
}
public ConfigurationCacheService getConfigurationCacheService() {
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
index 4f694de..80e9f7e 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
@@ -28,6 +28,7 @@ import javax.net.ssl.SSLSession;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
@@ -50,6 +51,7 @@ public class ServerConnection extends PulsarHandler {
private DiscoveryService service;
private String authRole = null;
+ private AuthenticationDataSource authenticationData = null;
private State state;
public static final String TLS_HANDLER = "tls";
@@ -87,8 +89,9 @@ public class ServerConnection extends PulsarHandler {
if (sslHandler != null) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
}
+ this.authenticationData = new AuthenticationDataCommand(authData, remoteAddress, sslSession);
authRole = service.getAuthenticationService()
- .authenticate(new AuthenticationDataCommand(authData, remoteAddress, sslSession), authMethod);
+ .authenticate(this.authenticationData, authMethod);
LOG.info("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod, authRole);
} catch (AuthenticationException e) {
String msg = "Unable to authenticate";
@@ -144,7 +147,9 @@ public class ServerConnection extends PulsarHandler {
final long requestId = partitionMetadata.getRequestId();
DestinationName dn = DestinationName.get(partitionMetadata.getTopic());
- service.getDiscoveryProvider().getPartitionedTopicMetadata(service, dn, authRole).thenAccept(metadata -> {
+ service.getDiscoveryProvider()
+ .getPartitionedTopicMetadata(service, dn, authRole, authenticationData)
+ .thenAccept(metadata -> {
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Total number of partitions for topic {} is {}", authRole, dn, metadata.partitions);
}
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
index 8e9cc61..eb986f1 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.discovery.service.server;
import java.util.Properties;
import java.util.Set;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.discovery.service.web.DiscoveryServiceServlet;
@@ -67,6 +68,8 @@ public class ServiceConfig implements PulsarConfiguration {
private Set<String> authenticationProviders = Sets.newTreeSet();
// Enforce authorization
private boolean authorizationEnabled = false;
+ // Authorization provider fully qualified class-name
+ private String authorizationProvider = PulsarAuthorizationProvider.class.getName();
/***** --- TLS --- ****/
// Enable TLS
@@ -210,6 +213,14 @@ public class ServiceConfig implements PulsarConfiguration {
this.authorizationEnabled = authorizationEnabled;
}
+ public String getAuthorizationProvider() {
+ return authorizationProvider;
+ }
+
+ public void setAuthorizationProvider(String authorizationProvider) {
+ this.authorizationProvider = authorizationProvider;
+ }
+
public Set<String> getSuperUserRoles() {
return superUserRoles;
}
diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
index bf83116..3351145 100644
--- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
+++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
@@ -101,7 +101,7 @@ public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
public void testGetPartitionsMetadata() throws Exception {
DestinationName topic1 = DestinationName.get("persistent://test/local/ns/my-topic-1");
- PartitionedTopicMetadata m = service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topic1, "role")
+ PartitionedTopicMetadata m = service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topic1, "role", null)
.get();
assertEquals(m.partitions, 0);
@@ -109,7 +109,7 @@ public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
mockZookKeeper.failNow(Code.SESSIONEXPIRED);
DestinationName topic2 = DestinationName.get("persistent://test/local/ns/my-topic-2");
CompletableFuture<PartitionedTopicMetadata> future = service.getDiscoveryProvider()
- .getPartitionedTopicMetadata(service, topic2, "role");
+ .getPartitionedTopicMetadata(service, topic2, "role", null);
try {
future.get();
fail("Partition metadata lookup should have failed");
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index c9444ff..8faa3da 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.proxy.server;
import static org.apache.bookkeeper.util.MathUtils.signSafeMod;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal;
import java.io.Closeable;
@@ -31,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
@@ -45,7 +47,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import io.netty.util.concurrent.DefaultThreadFactory;
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
/**
* Maintains available active broker list and returns next active broker in round-robin for discovery service.
@@ -96,11 +97,11 @@ public class BrokerDiscoveryProvider implements Closeable {
}
CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(ProxyService service,
- DestinationName destination, String role) {
+ DestinationName destination, String role, AuthenticationDataSource authenticationData) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
- checkAuthorization(service, destination, role);
+ checkAuthorization(service, destination, role, authenticationData);
final String path = path(PARTITIONED_TOPIC_PATH_ZNODE, destination.getProperty(), destination.getCluster(),
destination.getNamespacePortion(), "persistent", destination.getEncodedLocalName());
// gets the number of partitions from the zk cache
@@ -125,15 +126,15 @@ public class BrokerDiscoveryProvider implements Closeable {
return metadataFuture;
}
- protected static void checkAuthorization(ProxyService service, DestinationName destination, String role)
- throws Exception {
+ protected static void checkAuthorization(ProxyService service, DestinationName destination, String role,
+ AuthenticationDataSource authenticationData) throws Exception {
if (!service.getConfiguration().isAuthorizationEnabled()
|| service.getConfiguration().getSuperUserRoles().contains(role)) {
// No enforcing of authorization policies
return;
}
// get zk policy manager
- if (!service.getAuthorizationManager().canLookup(destination, role)) {
+ if (!service.getAuthorizationService().canLookup(destination, role, authenticationData)) {
LOG.warn("[{}] Role {} is not allowed to lookup topic", destination, role);
// check namespace authorization
PropertyAdmin propertyAdmin;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index f9ba3c1..7d4d683 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -161,7 +161,8 @@ public class LookupProxyHandler {
final long clientRequestId = partitionMetadata.getRequestId();
DestinationName dn = DestinationName.get(partitionMetadata.getTopic());
if (isBlank(brokerServiceURL)) {
- service.getDiscoveryProvider().getPartitionedTopicMetadata(service, dn, proxyConnection.clientAuthRole)
+ service.getDiscoveryProvider().getPartitionedTopicMetadata(service, dn, proxyConnection.clientAuthRole,
+ proxyConnection.authenticationData)
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}",
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index e5906ba..5dba6f8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.proxy.server;
import java.util.Properties;
import java.util.Set;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import com.google.common.collect.Sets;
@@ -59,10 +60,12 @@ public class ProxyConfiguration implements PulsarConfiguration {
private Set<String> authenticationProviders = Sets.newTreeSet();
// Enforce authorization
private boolean authorizationEnabled = false;
+ // Authorization provider fully qualified class-name
+ private String authorizationProvider = PulsarAuthorizationProvider.class.getName();
// Forward client authData to Broker for re authorization
// make sure authentication is enabled for this to take effect
private boolean forwardAuthorizationCredentials = false;
-
+
// Authentication settings of the proxy itself. Used to connect to brokers
private String brokerClientAuthenticationPlugin;
private String brokerClientAuthenticationParameters;
@@ -263,6 +266,14 @@ public class ProxyConfiguration implements PulsarConfiguration {
this.authorizationEnabled = authorizationEnabled;
}
+ public String getAuthorizationProvider() {
+ return authorizationProvider;
+ }
+
+ public void setAuthorizationProvider(String authorizationProvider) {
+ this.authorizationProvider = authorizationProvider;
+ }
+
public Set<String> getSuperUserRoles() {
return superUserRoles;
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 68bd022..89d737d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -27,6 +27,7 @@ import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi;
@@ -55,6 +56,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
String clientAuthRole = null;
String clientAuthData = null;
String clientAuthMethod = null;
+ AuthenticationDataSource authenticationData;
private State state;
private LookupProxyHandler lookupProxyHandler = null;
@@ -162,7 +164,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
close();
return;
}
-
+
if (connect.hasProxyToBrokerUrl()) {
// Client already knows which broker to connect. Let's open a connection
// there and just pass bytes in both directions
@@ -223,8 +225,9 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
if (sslHandler != null) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
}
+ authenticationData = new AuthenticationDataCommand(authData, remoteAddress, sslSession);
clientAuthRole = service.getAuthenticationService()
- .authenticate(new AuthenticationDataCommand(authData, remoteAddress, sslSession), authMethod);
+ .authenticate(authenticationData, authMethod);
LOG.info("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod,
clientAuthRole);
return true;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index e5c2e91..0ddee42 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -28,7 +28,7 @@ import java.net.UnknownHostException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientConfiguration;
@@ -60,7 +60,7 @@ public class ProxyService implements Closeable {
private final String serviceUrlTls;
private ConfigurationCacheService configurationCacheService;
private AuthenticationService authenticationService;
- private AuthorizationManager authorizationManager;
+ private AuthorizationService authorizationService;
private ZooKeeperClientFactory zkClientFactory = null;
private final EventLoopGroup acceptorGroup;
@@ -126,7 +126,7 @@ public class ProxyService implements Closeable {
});
discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory());
this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache);
- authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService);
+ authorizationService = new AuthorizationService(serviceConfiguration, configurationCacheService);
}
ServerBootstrap bootstrap = new ServerBootstrap();
@@ -200,8 +200,8 @@ public class ProxyService implements Closeable {
return authenticationService;
}
- public AuthorizationManager getAuthorizationManager() {
- return authorizationManager;
+ public AuthorizationService getAuthorizationService() {
+ return authorizationService;
}
public Authentication getClientAuthentication() {
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 9d15892..c308175 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -37,8 +37,8 @@ import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index 5a77a00..91e62b7 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -30,6 +30,8 @@ import javax.naming.AuthenticationException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.naming.DestinationName;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@@ -63,6 +65,7 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
private boolean checkAuth(ServletUpgradeResponse response) {
String authRole = "<none>";
+ AuthenticationDataSource authenticationData = new AuthenticationDataHttps(request);
if (service.isAuthenticationEnabled()) {
try {
authRole = service.getAuthenticationService().authenticateHttpRequest(request);
@@ -84,7 +87,7 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
if (service.isAuthorizationEnabled()) {
try {
- if (!isAuthorized(authRole)) {
+ if (!isAuthorized(authRole, authenticationData)) {
log.warn("[{}:{}] WebSocket Client [{}] is not authorized on topic {}", request.getRemoteAddr(),
request.getRemotePort(), authRole, topic);
response.sendError(HttpServletResponse.SC_FORBIDDEN, "Not authorized");
@@ -167,7 +170,7 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
return dn.toString();
}
- protected abstract Boolean isAuthorized(String authRole) throws Exception;
+ protected abstract Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception;
private static final Logger log = LoggerFactory.getLogger(AbstractWebSocketHandler.class);
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 27f62b1..b392e0c 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.LongAdder;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.MessageId;
@@ -282,8 +283,9 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
}
@Override
- protected Boolean isAuthorized(String authRole) throws Exception {
- return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole, this.subscription);
+ protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
+ return service.getAuthorizationService().canConsume(DestinationName.get(topic), authRole, authenticationData,
+ this.subscription);
}
private static String extractSubscription(HttpServletRequest request) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index e551885..cb2e288 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -18,15 +18,14 @@
*/
package org.apache.pulsar.websocket;
+import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
-import static org.apache.pulsar.websocket.WebSocketError.FailedToCreateProducer;
import static org.apache.pulsar.websocket.WebSocketError.FailedToDeserializeFromJSON;
import static org.apache.pulsar.websocket.WebSocketError.PayloadEncodingError;
import static org.apache.pulsar.websocket.WebSocketError.UnknownError;
import java.io.IOException;
import java.util.Base64;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
@@ -34,6 +33,7 @@ import java.util.concurrent.atomic.LongAdder;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
@@ -47,15 +47,12 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.stats.StatsBuckets;
-import org.eclipse.jetty.websocket.api.Session;
-import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Enums;
-import static com.google.common.base.Preconditions.checkArgument;
/**
@@ -224,8 +221,8 @@ public class ProducerHandler extends AbstractWebSocketHandler {
}
@Override
- protected Boolean isAuthorized(String authRole) throws Exception {
- return service.getAuthorizationManager().canProduce(DestinationName.get(topic), authRole);
+ protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
+ return service.getAuthorizationService().canProduce(DestinationName.get(topic), authRole, authenticationData);
}
private void sendAckResponse(ProducerAck response) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index eae7717..4d6c271 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.LongAdder;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
@@ -246,8 +247,9 @@ public class ReaderHandler extends AbstractWebSocketHandler {
}
@Override
- protected Boolean isAuthorized(String authRole) throws Exception {
- return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole, this.subscription);
+ protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
+ return service.getAuthorizationService().canConsume(DestinationName.get(topic), authRole, authenticationData,
+ this.subscription);
}
private MessageId getMessageId() throws IOException {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 02a589f..d5a2c84 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -34,7 +34,7 @@ import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
@@ -64,7 +64,7 @@ public class WebSocketService implements Closeable {
public static final int MaxTextFrameSize = 1024 * 1024;
AuthenticationService authenticationService;
- AuthorizationManager authorizationManager;
+ AuthorizationService authorizationService;
PulsarClient pulsarClient;
private final ScheduledExecutorService executor = Executors
@@ -112,13 +112,13 @@ public class WebSocketService implements Closeable {
log.info("Global Zookeeper cache started");
}
- // start authorizationManager
+ // start authorizationService
if (config.isAuthorizationEnabled()) {
if (configurationCacheService == null) {
throw new PulsarServerException(
"Failed to initialize authorization manager due to empty GlobalZookeeperServers");
}
- authorizationManager = new AuthorizationManager(this.config, configurationCacheService);
+ authorizationService = new AuthorizationService(this.config, configurationCacheService);
}
// start authentication service
authenticationService = new AuthenticationService(this.config);
@@ -147,8 +147,8 @@ public class WebSocketService implements Closeable {
return authenticationService;
}
- public AuthorizationManager getAuthorizationManager() {
- return authorizationManager;
+ public AuthorizationService getAuthorizationService() {
+ return authorizationService;
}
public ZooKeeperCache getGlobalZkCache() {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
index 122c12b..a634e18 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.websocket.admin;
import static org.apache.commons.lang3.StringUtils.isBlank;
-
import javax.naming.AuthenticationException;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
@@ -28,6 +27,7 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.websocket.WebSocketService;
import org.slf4j.Logger;
@@ -50,7 +50,8 @@ public class WebSocketWebResource {
private WebSocketService socketService;
private String clientId;
-
+ private AuthenticationDataHttps authData;
+
protected WebSocketService service() {
if (socketService == null) {
socketService = (WebSocketService) servletContext.getAttribute(ATTRIBUTE_PROXY_SERVICE_NAME);
@@ -79,8 +80,14 @@ public class WebSocketWebResource {
}
return clientId;
}
-
-
+
+ public AuthenticationDataHttps authData() {
+ if (authData == null) {
+ authData = new AuthenticationDataHttps(httpRequest);
+ }
+ return authData;
+ }
+
/**
* Checks whether the user has Pulsar Super-User access to the system.
*
@@ -128,8 +135,7 @@ public class WebSocketWebResource {
*/
protected boolean isAuthorized(DestinationName topic) throws Exception {
if (service().isAuthorizationEnabled()) {
- String authRole = clientAppId();
- return service().getAuthorizationManager().canLookup(topic, authRole);
+ return service().getAuthorizationService().canLookup(topic, clientAppId(), authData());
}
return true;
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index 6f9536c..5cea3df 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.websocket.service;
import java.util.Properties;
import java.util.Set;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
@@ -63,6 +64,10 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
private Set<String> authenticationProviders = Sets.newTreeSet();
// Enforce authorization
private boolean authorizationEnabled;
+ // Authorization provider fully qualified class-name
+ private String authorizationProvider = PulsarAuthorizationProvider.class.getName();
+
+
// Role names that are treated as "super-user", meaning they will be able to
// do all admin operations and publish/consume from all topics
private Set<String> superUserRoles = Sets.newTreeSet();
@@ -202,6 +207,14 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
this.authorizationEnabled = authorizationEnabled;
}
+ public String getAuthorizationProvider() {
+ return authorizationProvider;
+ }
+
+ public void setAuthorizationProvider(String authorizationProvider) {
+ this.authorizationProvider = authorizationProvider;
+ }
+
public boolean getAuthorizationAllowWildcardsMatching() {
return authorizationAllowWildcardsMatching;
}
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.