You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/16 21:04:53 UTC

[pulsar] branch master updated: [PIP-45] Remove ConfigurationCacheService from AuthorizationProvider (#12064)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 23ffdb7  [PIP-45] Remove ConfigurationCacheService from AuthorizationProvider (#12064)
23ffdb7 is described below

commit 23ffdb778e25169bfc7343920912521b41bf95f5
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Sep 16 14:04:08 2021 -0700

    [PIP-45] Remove ConfigurationCacheService from AuthorizationProvider (#12064)
    
    * [PIP-45] Remove ConfigurationCacheService from AuthorizationProvider
    
    * Fixed DiscoveryService
    
    * Fixed WebSocketService
    
    * Fixed references in Worker
    
    * Fixed BrokerService
    
    * Fixed ServerCnxTest
    
    * Fixed ProxyService
---
 .../authorization/AuthorizationProvider.java       | 23 ++++++++++++++++-
 .../broker/authorization/AuthorizationService.java |  5 ++--
 .../MultiRolesTokenAuthorizationProvider.java      |  5 ++--
 .../authorization/PulsarAuthorizationProvider.java | 10 ++++----
 .../pulsar/broker/service/BrokerService.java       |  2 +-
 .../broker/auth/MockAuthorizationProvider.java     |  5 +---
 .../pulsar/broker/service/ServerCnxTest.java       | 14 +++++-----
 .../api/AuthorizationProducerConsumerTest.java     |  3 ++-
 .../impl/PatternTopicsConsumerImplAuthTest.java    |  3 ++-
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    |  5 ++--
 .../pulsar/discovery/service/DiscoveryService.java | 12 +--------
 .../org/apache/pulsar/functions/worker/Worker.java | 11 +-------
 .../apache/pulsar/proxy/server/ProxyService.java   | 30 +++++-----------------
 .../apache/pulsar/websocket/WebSocketService.java  | 20 +++------------
 14 files changed, 62 insertions(+), 86 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 984cf92..c83ae4c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -93,8 +94,28 @@ public interface AuthorizationProvider extends Closeable {
      *            pulsar zk configuration cache service
      * @throws IOException
      *             if the initialization fails
+     *
+     * @deprecated ConfigurationCacheService is not supported anymore as a way to get access to metadata.
+     * @see #initialize(ServiceConfiguration, PulsarResources)
      */
-    void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException;
+    @Deprecated
+    default void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+    }
+
+    /**
+     * Perform initialization for the authorization provider
+     *
+     * @param conf
+     *            broker config object
+     * @param pulsarResources
+     *            Resources component for access to metadata
+     * @throws IOException
+     *             if the initialization fails
+     */
+    default void initialize(ServiceConfiguration conf, PulsarResources pulsarResources) throws IOException {
+        // For compatibility, call the old deprecated initialize
+        initialize(conf, (ConfigurationCacheService) null);
+    }
 
     /**
      * Check if the specified role has permission to send messages to the specified fully qualified topic name.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 26d0477..e08ce79 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -23,6 +23,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -55,14 +56,14 @@ public class AuthorizationService {
     private AuthorizationProvider provider;
     private final ServiceConfiguration conf;
 
-    public AuthorizationService(ServiceConfiguration conf, ConfigurationCacheService configCache)
+    public AuthorizationService(ServiceConfiguration conf, PulsarResources pulsarResources)
             throws PulsarServerException {
         this.conf = conf;
         try {
             final String providerClassname = conf.getAuthorizationProvider();
             if (StringUtils.isNotBlank(providerClassname)) {
                 provider = (AuthorizationProvider) Class.forName(providerClassname).newInstance();
-                provider.initialize(conf, configCache);
+                provider.initialize(conf, pulsarResources);
                 log.info("{} has been loaded.", providerClassname);
             } else {
                 throw new PulsarServerException("No authorization providers are present.");
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
index 23fb7bc..dcdf779 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -67,7 +68,7 @@ public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationPro
     }
 
     @Override
-    public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+    public void initialize(ServiceConfiguration conf, PulsarResources pulsarResources) throws IOException {
         String prefix = (String) conf.getProperty(CONF_TOKEN_SETTING_PREFIX);
         if (null == prefix) {
             prefix = "";
@@ -78,7 +79,7 @@ public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationPro
             this.roleClaim = (String) tokenAuthClaim;
         }
 
-        super.initialize(conf, configCache);
+        super.initialize(conf, pulsarResources);
     }
 
     private List<String> getRoles(AuthenticationDataSource authData) {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index c713fe2..3796512 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -63,17 +63,17 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
     public PulsarAuthorizationProvider() {
     }
 
-    public PulsarAuthorizationProvider(ServiceConfiguration conf, ConfigurationCacheService configCache)
+    public PulsarAuthorizationProvider(ServiceConfiguration conf, PulsarResources resources)
             throws IOException {
-        initialize(conf, configCache);
+        initialize(conf, resources);
     }
 
     @Override
-    public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+    public void initialize(ServiceConfiguration conf, PulsarResources pulsarResources) throws IOException {
         checkNotNull(conf, "ServiceConfiguration can't be null");
-        checkNotNull(configCache, "ConfigurationCacheService can't be null");
+        checkNotNull(pulsarResources, "PulsarResources can't be null");
         this.conf = conf;
-        this.pulsarResources = configCache.getPulsarResources();
+        this.pulsarResources = pulsarResources;
 
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 81c9ee8..3b10cea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -285,7 +285,7 @@ public class BrokerService implements Closeable {
         this.statsUpdater = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
         this.authorizationService = new AuthorizationService(
-                pulsar.getConfiguration(), pulsar.getConfigurationCache());
+                pulsar.getConfiguration(), pulsar().getPulsarResources());
 
         pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
         pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
index 421da4f..ad6327d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthorizationProvider.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationProvider;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -64,10 +65,6 @@ public class MockAuthorizationProvider implements AuthorizationProvider {
     }
 
     @Override
-    public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
-    }
-
-    @Override
     public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
                                                       AuthenticationDataSource authenticationData) {
         return roleAuthorizedAsync(role);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 6376c70..25cac83 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -528,13 +528,14 @@ public class ServerCnxTest {
         doReturn(zkDataCache).when(configCacheService).policiesCache();
         doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(matches(".*nonexistent.*"));
 
-        AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, configCacheService));
+        AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, pulsar.getPulsarResources()));
         doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
         svcConfig.setAuthorizationEnabled(true);
         Field providerField = AuthorizationService.class.getDeclaredField("provider");
         providerField.setAccessible(true);
-        PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
+        PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig,
+                pulsar.getPulsarResources()));
         providerField.set(authorizationService, authorizationProvider);
         doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());
 
@@ -560,10 +561,11 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void testClusterAccess() throws Exception {
         svcConfig.setAuthorizationEnabled(true);
-        AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, configCacheService));
+        AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, pulsar.getPulsarResources()));
         Field providerField = AuthorizationService.class.getDeclaredField("provider");
         providerField.setAccessible(true);
-        PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
+        PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig,
+                pulsar.getPulsarResources()));
         providerField.set(authorizationService, authorizationProvider);
         doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
@@ -590,12 +592,12 @@ public class ServerCnxTest {
 
     @Test(timeOut = 30000)
     public void testNonExistentTopicSuperUserAccess() throws Exception {
-        AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, configCacheService));
+        AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, pulsar.getPulsarResources()));
         doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
         Field providerField = AuthorizationService.class.getDeclaredField("provider");
         providerField.setAccessible(true);
-        PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
+        PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, pulsar.getPulsarResources()));
         providerField.set(authorizationService, authorizationProvider);
         doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index e346086..0ad79ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationProvider;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -459,7 +460,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         }
 
         @Override
-        public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+        public void initialize(ServiceConfiguration conf, PulsarResources pulsarResources) throws IOException {
             this.conf = conf;
             // No-op
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
index af47c79..b1328a2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authorization.AuthorizationProvider;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -258,7 +259,7 @@ public class PatternTopicsConsumerImplAuthTest extends ProducerConsumerBase {
         }
 
         @Override
-        public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+        public void initialize(ServiceConfiguration conf, PulsarResources resources) throws IOException {
             this.conf = conf;
             // No-op
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index fcb2a44..5867bb7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.collect.Sets;
 import java.io.Closeable;
 import java.io.File;
@@ -43,7 +42,7 @@ import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -138,7 +137,7 @@ public class PulsarFunctionTlsTest {
         functionsWorkerService.init(workerConfig, null, false);
 
         AuthenticationService authenticationService = new AuthenticationService(config);
-        AuthorizationService authorizationService = new AuthorizationService(config, mock(ConfigurationCacheService.class));
+        AuthorizationService authorizationService = new AuthorizationService(config, mock(PulsarResources.class));
         when(functionsWorkerService.getAuthenticationService()).thenReturn(authenticationService);
         when(functionsWorkerService.getAuthorizationService()).thenReturn(authorizationService);
         when(functionsWorkerService.isInitialized()).thenReturn(true);
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
index 3addd07..5e4b081 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
@@ -61,7 +61,6 @@ public class DiscoveryService implements Closeable {
     private final ServiceConfig config;
     private String serviceUrl;
     private String serviceUrlTls;
-    private ConfigurationMetadataCacheService configurationCacheService;
     private AuthenticationService authenticationService;
     private AuthorizationService authorizationService;
     private BrokerDiscoveryProvider discoveryProvider;
@@ -96,10 +95,9 @@ public class DiscoveryService implements Closeable {
         configMetadataStore = createConfigurationMetadataStore();
         pulsarResources = new PulsarResources(localMetadataStore, configMetadataStore);
         discoveryProvider = new BrokerDiscoveryProvider(this.config, pulsarResources);
-        this.configurationCacheService = new ConfigurationMetadataCacheService(pulsarResources, null);
         ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(config);
         authenticationService = new AuthenticationService(serviceConfiguration);
-        authorizationService = new AuthorizationService(serviceConfiguration, configurationCacheService);
+        authorizationService = new AuthorizationService(serviceConfiguration, pulsarResources);
         startServer();
     }
 
@@ -216,14 +214,6 @@ public class DiscoveryService implements Closeable {
         return authorizationService;
     }
 
-    public ConfigurationCacheService getConfigurationCacheService() {
-        return configurationCacheService;
-    }
-
-    public void setConfigurationCacheService(ConfigurationMetadataCacheService configurationCacheService) {
-        this.configurationCacheService = configurationCacheService;
-    }
-
     public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
         return PulsarResources.createMetadataStore(config.getZookeeperServers(), config.getZookeeperSessionTimeoutMs());
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 8627240..df3ce76 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -18,25 +18,19 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.util.Optional;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
-import org.apache.pulsar.broker.cache.ConfigurationMetadataCacheService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.functions.worker.rest.WorkerServer;
 import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
 
@@ -51,7 +45,6 @@ public class Worker {
     private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(8).name("zk-cache-ordered").build();
     private PulsarResources pulsarResources;
     private MetadataStoreExtended configMetadataStore;
-    private ConfigurationMetadataCacheService configurationCacheService;
     private final ErrorNotifier errorNotifier;
 
     public Worker(WorkerConfig workerConfig) {
@@ -89,9 +82,7 @@ public class Worker {
                 throw new PulsarServerException(e);
             }
             pulsarResources = new PulsarResources(null, configMetadataStore);
-            this.configurationCacheService = new ConfigurationMetadataCacheService(this.pulsarResources,
-                    this.workerConfig.getPulsarFunctionsCluster());
-                return new AuthorizationService(getServiceConfiguration(), this.configurationCacheService);
+            return new AuthorizationService(getServiceConfiguration(), this.pulsarResources);
             }
         return null;
     }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 53f5c9d..af83c5d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -20,20 +20,18 @@ package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
-
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.DefaultThreadFactory;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
-import lombok.Getter;
-import lombok.Setter;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -45,26 +43,22 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
-import org.apache.pulsar.broker.cache.ConfigurationMetadataCacheService;
 import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
 import org.apache.pulsar.proxy.stats.TopicStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 /**
  * Pulsar proxy service
  */
@@ -74,7 +68,6 @@ public class ProxyService implements Closeable {
     private final Timer timer;
     private String serviceUrl;
     private String serviceUrlTls;
-    private ConfigurationMetadataCacheService configurationCacheService;
     private final AuthenticationService authenticationService;
     private AuthorizationService authorizationService;
     private MetadataStoreExtended localMetadataStore;
@@ -174,9 +167,8 @@ public class ProxyService implements Closeable {
             configMetadataStore = createConfigurationMetadataStore();
             pulsarResources = new PulsarResources(localMetadataStore, configMetadataStore);
             discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, pulsarResources);
-            this.configurationCacheService = new ConfigurationMetadataCacheService(pulsarResources, null);
             authorizationService = new AuthorizationService(PulsarConfigurationLoader.convertFrom(proxyConfig),
-                                                            configurationCacheService);
+                                                            pulsarResources);
         }
 
         ServerBootstrap bootstrap = new ServerBootstrap();
@@ -294,14 +286,6 @@ public class ProxyService implements Closeable {
         return authorizationService;
     }
 
-    public ConfigurationCacheService getConfigurationCacheService() {
-        return configurationCacheService;
-    }
-
-    public void setConfigurationCacheService(ConfigurationMetadataCacheService configurationCacheService) {
-        this.configurationCacheService = configurationCacheService;
-    }
-
     public Semaphore getLookupRequestSemaphore() {
         return lookupRequestSemaphore.get();
     }
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 119594a..0753dd2 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -19,24 +19,20 @@
 package org.apache.pulsar.websocket;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import javax.servlet.ServletException;
 import javax.websocket.DeploymentException;
-
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
-import org.apache.pulsar.broker.cache.ConfigurationMetadataCacheService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -53,8 +49,6 @@ import org.apache.pulsar.websocket.stats.ProxyStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 /**
  * Socket proxy server which initializes other dependent services and starts server by opening web-socket end-point url.
  *
@@ -73,7 +67,6 @@ public class WebSocketService implements Closeable {
     private PulsarResources pulsarResources;
     private MetadataStoreExtended configMetadataStore;
     private ServiceConfiguration config;
-    private ConfigurationMetadataCacheService configurationCacheService;
 
     private ClusterData localCluster;
     private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ProducerHandler>> topicProducerMap;
@@ -105,16 +98,15 @@ public class WebSocketService implements Closeable {
                 throw new PulsarServerException(e);
             }
             pulsarResources = new PulsarResources(null, configMetadataStore);
-            this.configurationCacheService = new ConfigurationMetadataCacheService(pulsarResources, null);
         }
 
         // start authorizationService
         if (config.isAuthorizationEnabled()) {
-            if (configurationCacheService == null) {
+            if (pulsarResources == null) {
                 throw new PulsarServerException(
                         "Failed to initialize authorization manager due to empty ConfigurationStoreServers");
             }
-            authorizationService = new AuthorizationService(this.config, configurationCacheService);
+            authorizationService = new AuthorizationService(this.config, pulsarResources);
         }
         // start authentication service
         authenticationService = new AuthenticationService(this.config);
@@ -221,7 +213,7 @@ public class WebSocketService implements Closeable {
     }
 
     private ClusterData retrieveClusterData() throws PulsarServerException {
-        if (configurationCacheService == null) {
+        if (pulsarResources == null) {
             throw new PulsarServerException(
                 "Failed to retrieve Cluster data due to empty ConfigurationStoreServers");
         }
@@ -237,10 +229,6 @@ public class WebSocketService implements Closeable {
         return proxyStats;
     }
 
-    public ConfigurationCacheService getConfigurationCache() {
-        return configurationCacheService;
-    }
-
     public ScheduledExecutorService getExecutor() {
         return executor;
     }