You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/16 21:04:53 UTC
[pulsar] branch master updated: [PIP-45] Remove
ConfigurationCacheService from AuthorizationProvider (#12064)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 23ffdb7 [PIP-45] Remove ConfigurationCacheService from AuthorizationProvider (#12064)
23ffdb7 is described below
commit 23ffdb778e25169bfc7343920912521b41bf95f5
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Sep 16 14:04:08 2021 -0700
[PIP-45] Remove ConfigurationCacheService from AuthorizationProvider (#12064)
* [PIP-45] Remove ConfigurationCacheService from AuthorizationProvider
* Fixed DiscoveryService
* Fixed WebSocketService
* Fixed references in Worker
* Fixed BrokerService
* Fixed ServerCnxTest
* Fixed ProxyService
---
.../authorization/AuthorizationProvider.java | 23 ++++++++++++++++-
.../broker/authorization/AuthorizationService.java | 5 ++--
.../MultiRolesTokenAuthorizationProvider.java | 5 ++--
.../authorization/PulsarAuthorizationProvider.java | 10 ++++----
.../pulsar/broker/service/BrokerService.java | 2 +-
.../broker/auth/MockAuthorizationProvider.java | 5 +---
.../pulsar/broker/service/ServerCnxTest.java | 14 +++++-----
.../api/AuthorizationProducerConsumerTest.java | 3 ++-
.../impl/PatternTopicsConsumerImplAuthTest.java | 3 ++-
.../apache/pulsar/io/PulsarFunctionTlsTest.java | 5 ++--
.../pulsar/discovery/service/DiscoveryService.java | 12 +--------
.../org/apache/pulsar/functions/worker/Worker.java | 11 +-------
.../apache/pulsar/proxy/server/ProxyService.java | 30 +++++-----------------
.../apache/pulsar/websocket/WebSocketService.java | 20 +++------------
14 files changed, 62 insertions(+), 86 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 984cf92..c83ae4c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -93,8 +94,28 @@ public interface AuthorizationProvider extends Closeable {
* pulsar zk configuration cache service
* @throws IOException
* if the initialization fails
+ *
+ * @deprecated ConfigurationCacheService is not supported anymore as a way to get access to metadata.
+ * @see #initialize(ServiceConfiguration, PulsarResources)
*/
- void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException;
+ @Deprecated
+ default void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+ }
+
+ /**
+ * Perform initialization for the authorization provider
+ *
+ * @param conf
+ * broker config object
+ * @param pulsarResources
+ * Resources component for access to metadata
+ * @throws IOException
+ * if the initialization fails
+ */
+ default void initialize(ServiceConfiguration conf, PulsarResources pulsarResources) throws IOException {
+ // For compatibility, call the old deprecated initialize
+ initialize(conf, (ConfigurationCacheService) null);
+ }
/**
* Check if the specified role has permission to send messages to the specified fully qualified topic name.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 26d0477..e08ce79 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -23,6 +23,7 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -55,14 +56,14 @@ public class AuthorizationService {
private AuthorizationProvider provider;
private final ServiceConfiguration conf;
- public AuthorizationService(ServiceConfiguration conf, ConfigurationCacheService configCache)
+ public AuthorizationService(ServiceConfiguration conf, PulsarResources pulsarResources)
throws PulsarServerException {
this.conf = conf;
try {
final String providerClassname = conf.getAuthorizationProvider();
if (StringUtils.isNotBlank(providerClassname)) {
provider = (AuthorizationProvider) Class.forName(providerClassname).newInstance();
- provider.initialize(conf, configCache);
+ provider.initialize(conf, pulsarResources);
log.info("{} has been loaded.", providerClassname);
} else {
throw new PulsarServerException("No authorization providers are present.");
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
index 23fb7bc..dcdf779 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -67,7 +68,7 @@ public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationPro
}
@Override
- public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+ public void initialize(ServiceConfiguration conf, PulsarResources pulsarResources) throws IOException {
String prefix = (String) conf.getProperty(CONF_TOKEN_SETTING_PREFIX);
if (null == prefix) {
prefix = "";
@@ -78,7 +79,7 @@ public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationPro
this.roleClaim = (String) tokenAuthClaim;
}
- super.initialize(conf, configCache);
+ super.initialize(conf, pulsarResources);
}
private List<String> getRoles(AuthenticationDataSource authData) {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index c713fe2..3796512 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -63,17 +63,17 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
public PulsarAuthorizationProvider() {
}
- public PulsarAuthorizationProvider(ServiceConfiguration conf, ConfigurationCacheService configCache)
+ public PulsarAuthorizationProvider(ServiceConfiguration conf, PulsarResources resources)
throws IOException {
- initialize(conf, configCache);
+ initialize(conf, resources);
}
@Override
- public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+ public void initialize(ServiceConfiguration conf, PulsarResources pulsarResources) throws IOException {
checkNotNull(conf, "ServiceConfiguration can't be null");
- checkNotNull(configCache, "ConfigurationCacheService can't be null");
+ checkNotNull(pulsarResources, "PulsarResources can't be null");
this.conf = conf;
- this.pulsarResources = configCache.getPulsarResources();
+ this.pulsarResources = pulsarResources;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 81c9ee8..3b10cea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -285,7 +285,7 @@ public class BrokerService implements Closeable {
this.statsUpdater = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
this.authorizationService = new AuthorizationService(
- pulsar.getConfiguration(), pulsar.getConfigurationCache());
+ pulsar.getConfiguration(), pulsar().getPulsarResources());
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
index 421da4f..ad6327d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationProvider;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -64,10 +65,6 @@ public class MockAuthorizationProvider implements AuthorizationProvider {
}
@Override
- public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
- }
-
- @Override
public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData) {
return roleAuthorizedAsync(role);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 6376c70..25cac83 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -528,13 +528,14 @@ public class ServerCnxTest {
doReturn(zkDataCache).when(configCacheService).policiesCache();
doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(matches(".*nonexistent.*"));
- AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, configCacheService));
+ AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, pulsar.getPulsarResources()));
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthorizationEnabled();
svcConfig.setAuthorizationEnabled(true);
Field providerField = AuthorizationService.class.getDeclaredField("provider");
providerField.setAccessible(true);
- PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
+ PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig,
+ pulsar.getPulsarResources()));
providerField.set(authorizationService, authorizationProvider);
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());
@@ -560,10 +561,11 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testClusterAccess() throws Exception {
svcConfig.setAuthorizationEnabled(true);
- AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, configCacheService));
+ AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, pulsar.getPulsarResources()));
Field providerField = AuthorizationService.class.getDeclaredField("provider");
providerField.setAccessible(true);
- PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
+ PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig,
+ pulsar.getPulsarResources()));
providerField.set(authorizationService, authorizationProvider);
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthorizationEnabled();
@@ -590,12 +592,12 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testNonExistentTopicSuperUserAccess() throws Exception {
- AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, configCacheService));
+ AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, pulsar.getPulsarResources()));
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthorizationEnabled();
Field providerField = AuthorizationService.class.getDeclaredField("provider");
providerField.setAccessible(true);
- PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
+ PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, pulsar.getPulsarResources()));
providerField.set(authorizationService, authorizationProvider);
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index e346086..0ad79ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationProvider;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -459,7 +460,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
}
@Override
- public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+ public void initialize(ServiceConfiguration conf, PulsarResources pulsarResources) throws IOException {
this.conf = conf;
// No-op
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
index af47c79..b1328a2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authorization.AuthorizationProvider;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -258,7 +259,7 @@ public class PatternTopicsConsumerImplAuthTest extends ProducerConsumerBase {
}
@Override
- public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+ public void initialize(ServiceConfiguration conf, PulsarResources resources) throws IOException {
this.conf = conf;
// No-op
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index fcb2a44..5867bb7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import com.google.common.collect.Sets;
import java.io.Closeable;
import java.io.File;
@@ -43,7 +42,7 @@ import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -138,7 +137,7 @@ public class PulsarFunctionTlsTest {
functionsWorkerService.init(workerConfig, null, false);
AuthenticationService authenticationService = new AuthenticationService(config);
- AuthorizationService authorizationService = new AuthorizationService(config, mock(ConfigurationCacheService.class));
+ AuthorizationService authorizationService = new AuthorizationService(config, mock(PulsarResources.class));
when(functionsWorkerService.getAuthenticationService()).thenReturn(authenticationService);
when(functionsWorkerService.getAuthorizationService()).thenReturn(authorizationService);
when(functionsWorkerService.isInitialized()).thenReturn(true);
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
index 3addd07..5e4b081 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
@@ -61,7 +61,6 @@ public class DiscoveryService implements Closeable {
private final ServiceConfig config;
private String serviceUrl;
private String serviceUrlTls;
- private ConfigurationMetadataCacheService configurationCacheService;
private AuthenticationService authenticationService;
private AuthorizationService authorizationService;
private BrokerDiscoveryProvider discoveryProvider;
@@ -96,10 +95,9 @@ public class DiscoveryService implements Closeable {
configMetadataStore = createConfigurationMetadataStore();
pulsarResources = new PulsarResources(localMetadataStore, configMetadataStore);
discoveryProvider = new BrokerDiscoveryProvider(this.config, pulsarResources);
- this.configurationCacheService = new ConfigurationMetadataCacheService(pulsarResources, null);
ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(config);
authenticationService = new AuthenticationService(serviceConfiguration);
- authorizationService = new AuthorizationService(serviceConfiguration, configurationCacheService);
+ authorizationService = new AuthorizationService(serviceConfiguration, pulsarResources);
startServer();
}
@@ -216,14 +214,6 @@ public class DiscoveryService implements Closeable {
return authorizationService;
}
- public ConfigurationCacheService getConfigurationCacheService() {
- return configurationCacheService;
- }
-
- public void setConfigurationCacheService(ConfigurationMetadataCacheService configurationCacheService) {
- this.configurationCacheService = configurationCacheService;
- }
-
public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
return PulsarResources.createMetadataStore(config.getZookeeperServers(), config.getZookeeperSessionTimeoutMs());
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 8627240..df3ce76 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -18,25 +18,19 @@
*/
package org.apache.pulsar.functions.worker;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.Optional;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
-import org.apache.pulsar.broker.cache.ConfigurationMetadataCacheService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.functions.worker.rest.WorkerServer;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
@@ -51,7 +45,6 @@ public class Worker {
private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(8).name("zk-cache-ordered").build();
private PulsarResources pulsarResources;
private MetadataStoreExtended configMetadataStore;
- private ConfigurationMetadataCacheService configurationCacheService;
private final ErrorNotifier errorNotifier;
public Worker(WorkerConfig workerConfig) {
@@ -89,9 +82,7 @@ public class Worker {
throw new PulsarServerException(e);
}
pulsarResources = new PulsarResources(null, configMetadataStore);
- this.configurationCacheService = new ConfigurationMetadataCacheService(this.pulsarResources,
- this.workerConfig.getPulsarFunctionsCluster());
- return new AuthorizationService(getServiceConfiguration(), this.configurationCacheService);
+ return new AuthorizationService(getServiceConfiguration(), this.pulsarResources);
}
return null;
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 53f5c9d..af83c5d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -20,20 +20,18 @@ package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
-
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
-import lombok.Getter;
-import lombok.Setter;
-
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -45,26 +43,22 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-
+import lombok.Getter;
+import lombok.Setter;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
-import org.apache.pulsar.broker.cache.ConfigurationMetadataCacheService;
import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
import org.apache.pulsar.proxy.stats.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
/**
* Pulsar proxy service
*/
@@ -74,7 +68,6 @@ public class ProxyService implements Closeable {
private final Timer timer;
private String serviceUrl;
private String serviceUrlTls;
- private ConfigurationMetadataCacheService configurationCacheService;
private final AuthenticationService authenticationService;
private AuthorizationService authorizationService;
private MetadataStoreExtended localMetadataStore;
@@ -174,9 +167,8 @@ public class ProxyService implements Closeable {
configMetadataStore = createConfigurationMetadataStore();
pulsarResources = new PulsarResources(localMetadataStore, configMetadataStore);
discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, pulsarResources);
- this.configurationCacheService = new ConfigurationMetadataCacheService(pulsarResources, null);
authorizationService = new AuthorizationService(PulsarConfigurationLoader.convertFrom(proxyConfig),
- configurationCacheService);
+ pulsarResources);
}
ServerBootstrap bootstrap = new ServerBootstrap();
@@ -294,14 +286,6 @@ public class ProxyService implements Closeable {
return authorizationService;
}
- public ConfigurationCacheService getConfigurationCacheService() {
- return configurationCacheService;
- }
-
- public void setConfigurationCacheService(ConfigurationMetadataCacheService configurationCacheService) {
- this.configurationCacheService = configurationCacheService;
- }
-
public Semaphore getLookupRequestSemaphore() {
return lookupRequestSemaphore.get();
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 119594a..0753dd2 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -19,24 +19,20 @@
package org.apache.pulsar.websocket;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
-
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
-import org.apache.pulsar.broker.cache.ConfigurationMetadataCacheService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
@@ -53,8 +49,6 @@ import org.apache.pulsar.websocket.stats.ProxyStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
/**
* Socket proxy server which initializes other dependent services and starts server by opening web-socket end-point url.
*
@@ -73,7 +67,6 @@ public class WebSocketService implements Closeable {
private PulsarResources pulsarResources;
private MetadataStoreExtended configMetadataStore;
private ServiceConfiguration config;
- private ConfigurationMetadataCacheService configurationCacheService;
private ClusterData localCluster;
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ProducerHandler>> topicProducerMap;
@@ -105,16 +98,15 @@ public class WebSocketService implements Closeable {
throw new PulsarServerException(e);
}
pulsarResources = new PulsarResources(null, configMetadataStore);
- this.configurationCacheService = new ConfigurationMetadataCacheService(pulsarResources, null);
}
// start authorizationService
if (config.isAuthorizationEnabled()) {
- if (configurationCacheService == null) {
+ if (pulsarResources == null) {
throw new PulsarServerException(
"Failed to initialize authorization manager due to empty ConfigurationStoreServers");
}
- authorizationService = new AuthorizationService(this.config, configurationCacheService);
+ authorizationService = new AuthorizationService(this.config, pulsarResources);
}
// start authentication service
authenticationService = new AuthenticationService(this.config);
@@ -221,7 +213,7 @@ public class WebSocketService implements Closeable {
}
private ClusterData retrieveClusterData() throws PulsarServerException {
- if (configurationCacheService == null) {
+ if (pulsarResources == null) {
throw new PulsarServerException(
"Failed to retrieve Cluster data due to empty ConfigurationStoreServers");
}
@@ -237,10 +229,6 @@ public class WebSocketService implements Closeable {
return proxyStats;
}
- public ConfigurationCacheService getConfigurationCache() {
- return configurationCacheService;
- }
-
public ScheduledExecutorService getExecutor() {
return executor;
}