You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2021/01/30 03:04:31 UTC

[pulsar] branch master updated: [pulsar-broker] broker resources use metadata-store api (#9346)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fa66a24  [pulsar-broker] broker resources use metadata-store api (#9346)
fa66a24 is described below

commit fa66a2410e3266ad5b01f00f96504e84fe6084cd
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri Jan 29 19:03:36 2021 -0800

    [pulsar-broker] broker resources use metadata-store api (#9346)
    
    * [pulsar-broker] broker resources use metadata-store api
    
    fix test
    
    * fix api
---
 .../org/apache/pulsar/broker/PulsarService.java    |  2 +-
 .../apache/pulsar/broker/admin/AdminResource.java  | 27 ----------
 .../pulsar/broker/admin/impl/BaseResources.java    | 25 +++++----
 .../pulsar/broker/admin/impl/BrokersBase.java      | 61 ++++++----------------
 .../pulsar/broker/admin/impl/ClustersBase.java     |  8 +--
 ...ces.java => DynamicConfigurationResources.java} | 18 +++----
 .../pulsar/broker/admin/impl/PulsarResources.java  |  4 +-
 .../pulsar/broker/web/PulsarWebResource.java       | 29 ++++++++++
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  8 ---
 9 files changed, 77 insertions(+), 105 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3e03297..fc8f309 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -484,7 +484,7 @@ public class PulsarService implements AutoCloseable {
             coordinationService = new CoordinationServiceImpl(localMetadataStore);
 
             configurationMetadataStore = createConfigurationMetadataStore();
-            pulsarResources = new PulsarResources(configurationMetadataStore);
+            pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore);
 
             orderedExecutor = OrderedExecutor.newBuilder()
                     .numThreads(config.getNumOrderedExecutorThreads())
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1728c88..453c39b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -23,8 +23,6 @@ import static org.apache.pulsar.common.util.Codec.decode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.errorprone.annotations.CanIgnoreReturnValue;
-import java.net.MalformedURLException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -39,7 +37,6 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.UriBuilder;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -417,30 +414,6 @@ public abstract class AdminResource extends PulsarWebResource {
         }
     }
 
-    /**
-     * Redirect the call to the specified broker.
-     *
-     * @param broker
-     *            Broker name
-     * @throws MalformedURLException
-     *             In case the redirect happens
-     */
-    protected void validateBrokerName(String broker) throws MalformedURLException {
-        String brokerUrl = String.format("http://%s", broker);
-        String brokerUrlTls = String.format("https://%s", broker);
-        if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
-                && !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
-            String[] parts = broker.split(":");
-            checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
-            String host = parts[0];
-            int port = Integer.parseInt(parts[1]);
-
-            URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(host).port(port).build();
-            log.debug("[{}] Redirecting the rest call to {}: broker={}", clientAppId(), redirect, broker);
-            throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
-        }
-    }
-
     protected Policies getNamespacePolicies(NamespaceName namespaceName) {
         try {
             final String namespace = namespaceName.toString();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
index 07cd9c4..dfbad73 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
@@ -97,13 +97,24 @@ public class BaseResources<T> {
         return cache.readModifyUpdate(path, modifyFunction);
     }
 
-    public void create(String path, T data) throws MetadataStoreException {
-        create(path, t -> data);
+    public void setWithCreate(String path, Function<Optional<T>, T> createFunction) throws MetadataStoreException {
+        try {
+            setWithCreateAsync(path, createFunction).get();
+        } catch (ExecutionException e) {
+            throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
+                    : new MetadataStoreException(e.getCause());
+        } catch (Exception e) {
+            throw new MetadataStoreException("Failed to set/create " + path, e);
+        }
     }
 
-    public void create(String path, Function<Optional<T>, T> createFunction) throws MetadataStoreException {
+    public CompletableFuture<Void> setWithCreateAsync(String path, Function<Optional<T>, T> createFunction) {
+        return cache.readModifyUpdateOrCreate(path, createFunction);
+    }
+
+    public void create(String path, T data) throws MetadataStoreException {
         try {
-            createAsync(path, createFunction).get();
+            createAsync(path, data).get();
         } catch (ExecutionException e) {
             throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
                     : new MetadataStoreException(e.getCause());
@@ -113,11 +124,7 @@ public class BaseResources<T> {
     }
 
     public CompletableFuture<Void> createAsync(String path, T data) {
-        return createAsync(path, t -> data);
-    }
-
-    public CompletableFuture<Void> createAsync(String path, Function<Optional<T>, T> createFunction) {
-        return cache.readModifyUpdateOrCreate(path, createFunction);
+        return cache.create(path, data);
     }
 
     public void delete(String path) throws MetadataStoreException {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 85549af..25b5149 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -40,14 +41,13 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
-import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService.State;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -57,19 +57,14 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Broker admin base.
  */
-public class BrokersBase extends AdminResource {
+public class BrokersBase extends PulsarWebResource {
     private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
-    private int serviceConfigZkVersion = -1;
 
     @GET
     @Path("/{cluster}")
@@ -90,7 +85,7 @@ public class BrokersBase extends AdminResource {
 
         try {
             // Add Native brokers
-            return pulsar().getLocalZkCache().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT);
+            return new HashSet<>(dynamicConfigurationResources().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT));
         } catch (Exception e) {
             LOG.error("[{}] Failed to get active broker list: cluster={}", clientAppId(), cluster, e);
             throw new RestException(e);
@@ -134,7 +129,7 @@ public class BrokersBase extends AdminResource {
     public void updateDynamicConfiguration(@PathParam("configName") String configName,
                                            @PathParam("configValue") String configValue) throws Exception {
         validateSuperUserAccess();
-        updateDynamicConfigurationOnZk(configName, configValue);
+        persistDynamicConfiguration(configName, configValue);
     }
 
     @DELETE
@@ -159,12 +154,8 @@ public class BrokersBase extends AdminResource {
         @ApiResponse(code = 500, message = "Internal server error")})
     public Map<String, String> getAllDynamicConfigurations() throws Exception {
         validateSuperUserAccess();
-
-        ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
-                .getDynamicConfigurationCache();
-        Map<String, String> configurationMap = null;
         try {
-            configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
+            return dynamicConfigurationResources().get(BROKER_SERVICE_CONFIGURATION_PATH)
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find configuration in zk"));
         } catch (RestException e) {
             LOG.error("[{}] couldn't find any configuration in zk {}", clientAppId(), e.getMessage(), e);
@@ -173,7 +164,6 @@ public class BrokersBase extends AdminResource {
             LOG.error("[{}] Failed to retrieve configuration from zk {}", clientAppId(), e.getMessage(), e);
             throw new RestException(e);
         }
-        return configurationMap;
     }
 
     @GET
@@ -204,29 +194,17 @@ public class BrokersBase extends AdminResource {
      * @param configValue
      *            : configuration value
      */
-    private synchronized void updateDynamicConfigurationOnZk(String configName, String configValue) {
+    private synchronized void persistDynamicConfiguration(String configName, String configValue) {
         try {
             if (!BrokerService.validateDynamicConfiguration(configName, configValue)) {
                 throw new RestException(Status.PRECONDITION_FAILED, " Invalid dynamic-config value");
             }
             if (BrokerService.isDynamicConfiguration(configName)) {
-                ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
-                        .getDynamicConfigurationCache();
-                Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
-                        .orElse(null);
-                if (configurationMap != null) {
-                    configurationMap.put(configName, configValue);
-                    byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
-                    dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
-                    serviceConfigZkVersion = localZk()
-                            .setData(BROKER_SERVICE_CONFIGURATION_PATH, content, serviceConfigZkVersion).getVersion();
-                } else {
-                    configurationMap = Maps.newHashMap();
+                dynamicConfigurationResources().setWithCreate(BROKER_SERVICE_CONFIGURATION_PATH, (old) -> {
+                    Map<String, String> configurationMap = old.isPresent() ? old.get() : Maps.newHashMap();
                     configurationMap.put(configName, configValue);
-                    byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
-                    ZkUtils.createFullPathOptimistic(localZk(), BROKER_SERVICE_CONFIGURATION_PATH, content,
-                            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-                }
+                    return configurationMap;
+                });
                 LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), configName, configValue);
             } else {
                 if (LOG.isDebugEnabled()) {
@@ -393,17 +371,12 @@ public class BrokersBase extends AdminResource {
     private synchronized void deleteDynamicConfigurationOnZk(String configName) {
         try {
             if (BrokerService.isDynamicConfiguration(configName)) {
-                ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
-                        .getDynamicConfigurationCache();
-                Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
-                        .orElse(null);
-                if (configurationMap != null && configurationMap.containsKey(configName)) {
-                    configurationMap.remove(configName);
-                    byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
-                    dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
-                    serviceConfigZkVersion = localZk()
-                            .setData(BROKER_SERVICE_CONFIGURATION_PATH, content, serviceConfigZkVersion).getVersion();
-                }
+                dynamicConfigurationResources().set(BROKER_SERVICE_CONFIGURATION_PATH, (old) -> {
+                    if (old != null) {
+                        old.remove(configName);
+                    }
+                    return old;
+                });
                 LOG.info("[{}] Deleted Service configuration {}", clientAppId(), configName);
             } else {
                 if (LOG.isDebugEnabled()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index b483114..a52a5c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -685,7 +685,8 @@ public class ClustersBase extends PulsarWebResource {
             NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
                     .getPolicies(nsIsolationPolicyPath).orElseGet(() -> {
                         try {
-                            namespaceIsolationPolicies().create(nsIsolationPolicyPath, Collections.emptyMap());
+                            namespaceIsolationPolicies().setWithCreate(nsIsolationPolicyPath,
+                                    (p) -> Collections.emptyMap());
                             return new NamespaceIsolationPolicies();
                         } catch (Exception e) {
                             throw new RestException(e);
@@ -835,7 +836,8 @@ public class ClustersBase extends PulsarWebResource {
             NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
                     .getPolicies(nsIsolationPolicyPath).orElseGet(() -> {
                         try {
-                            namespaceIsolationPolicies().create(nsIsolationPolicyPath, Collections.emptyMap());
+                            namespaceIsolationPolicies().setWithCreate(nsIsolationPolicyPath,
+                                    (p) -> Collections.emptyMap());
                             return new NamespaceIsolationPolicies();
                         } catch (Exception e) {
                             throw new RestException(e);
@@ -893,7 +895,7 @@ public class ClustersBase extends PulsarWebResource {
         try {
             String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName);
             FailureDomainResources failureDomainListCache = clusterResources().getFailureDomainResources();
-            failureDomainListCache.create(domainPath, old -> domain);
+            failureDomainListCache.setWithCreate(domainPath, old -> domain);
         } catch (NotFoundException nne) {
             log.warn("[{}] Failed to update domain {}. clusters {}  Does not exist", clientAppId(), cluster,
                     domainName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/DynamicConfigurationResources.java
similarity index 62%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/DynamicConfigurationResources.java
index 4384762..99c8d3c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/DynamicConfigurationResources.java
@@ -18,20 +18,14 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
-import lombok.AccessLevel;
-import lombok.Getter;
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.util.Map;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
-@Getter(AccessLevel.PUBLIC)
-public class PulsarResources {
+public class DynamicConfigurationResources extends BaseResources<Map<String, String>> {
 
-    private TenantResources tenatResources;
-    private ClusterResources clusterResources;
-    private NamespaceResources namespaceResources;
-
-    public PulsarResources(MetadataStoreExtended configurationMetadataStore) {
-        tenatResources = new TenantResources(configurationMetadataStore);
-        clusterResources = new ClusterResources(configurationMetadataStore);
-        namespaceResources = new NamespaceResources(configurationMetadataStore);
+    public DynamicConfigurationResources(MetadataStoreExtended store) {
+        super(store, new TypeReference<Map<String, String>>(){});
     }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
index 4384762..41dd53b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
@@ -28,10 +28,12 @@ public class PulsarResources {
     private TenantResources tenatResources;
     private ClusterResources clusterResources;
     private NamespaceResources namespaceResources;
+    private DynamicConfigurationResources dynamicConfigResources;
 
-    public PulsarResources(MetadataStoreExtended configurationMetadataStore) {
+    public PulsarResources(MetadataStoreExtended localMetadataStore, MetadataStoreExtended configurationMetadataStore) {
         tenatResources = new TenantResources(configurationMetadataStore);
         clusterResources = new ClusterResources(configurationMetadataStore);
         namespaceResources = new NamespaceResources(configurationMetadataStore);
+        dynamicConfigResources = new DynamicConfigurationResources(localMetadataStore);
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 1a307a8..141cae9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.admin.impl.ClusterResources;
+import org.apache.pulsar.broker.admin.impl.DynamicConfigurationResources;
 import org.apache.pulsar.broker.admin.impl.NamespaceResources;
 import org.apache.pulsar.broker.admin.impl.NamespaceResources.IsolationPolicyResources;
 import org.apache.pulsar.broker.admin.impl.TenantResources;
@@ -879,6 +880,10 @@ public abstract class PulsarWebResource {
         return namespaceResources().getIsolationPolicies();
     }
 
+    protected DynamicConfigurationResources dynamicConfigurationResources() {
+        return pulsar().getPulsarResources().getDynamicConfigResources();
+    }
+
     public static ObjectMapper jsonMapper() {
         return ObjectMapperFactory.getThreadLocal();
     }
@@ -992,4 +997,28 @@ public abstract class PulsarWebResource {
         return activeNamespaceFuture.isEmpty() ? CompletableFuture.completedFuture(null)
                 : FutureUtil.waitForAll(activeNamespaceFuture);
     }
+
+    /**
+     * Redirect the call to the specified broker.
+     *
+     * @param broker
+     *            Broker name
+     * @throws MalformedURLException
+     *             In case the redirect happens
+     */
+    protected void validateBrokerName(String broker) throws MalformedURLException {
+        String brokerUrl = String.format("http://%s", broker);
+        String brokerUrlTls = String.format("https://%s", broker);
+        if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
+                && !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
+            String[] parts = broker.split(":");
+            checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
+            String host = parts[0];
+            int port = Integer.parseInt(parts[1]);
+
+            URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(host).port(port).build();
+            log.debug("[{}] Redirecting the rest call to {}: broker={}", clientAppId(), redirect, broker);
+            throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index f51eed1..ee157e7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -134,10 +134,6 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
 
         clusters = spy(new Clusters());
         clusters.setPulsar(pulsar);
-        /*doReturn(mockZooKeeperGlobal).when(clusters).globalZk();
-        doReturn(configurationCache.clustersCache()).when(clusters).clustersCache();
-        doReturn(configurationCache.clustersListCache()).when(clusters).clustersListCache();
-        doReturn(configurationCache.namespaceIsolationPoliciesCache()).when(clusters).namespaceIsolationPoliciesCache();*/
         doReturn("test").when(clusters).clientAppId();
         doNothing().when(clusters).validateSuperUserAccess();
 
@@ -160,11 +156,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         doNothing().when(namespaces).validateAdminAccessForTenant("new-property");
 
         brokers = spy(new Brokers());
-        brokers.setServletContext(new MockServletContext());
         brokers.setPulsar(pulsar);
-        doReturn(mockZooKeeperGlobal).when(brokers).globalZk();
-        doReturn(mockZooKeeper).when(brokers).localZk();
-        doReturn(configurationCache.clustersListCache()).when(brokers).clustersListCache();
         doReturn("test").when(brokers).clientAppId();
         doNothing().when(brokers).validateSuperUserAccess();