You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2021/01/30 03:04:31 UTC
[pulsar] branch master updated: [pulsar-broker] broker resources
use metadata-store api (#9346)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fa66a24 [pulsar-broker] broker resources use metadata-store api (#9346)
fa66a24 is described below
commit fa66a2410e3266ad5b01f00f96504e84fe6084cd
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri Jan 29 19:03:36 2021 -0800
[pulsar-broker] broker resources use metadata-store api (#9346)
* [pulsar-broker] broker resources use metadata-store api
fix test
* fix api
---
.../org/apache/pulsar/broker/PulsarService.java | 2 +-
.../apache/pulsar/broker/admin/AdminResource.java | 27 ----------
.../pulsar/broker/admin/impl/BaseResources.java | 25 +++++----
.../pulsar/broker/admin/impl/BrokersBase.java | 61 ++++++----------------
.../pulsar/broker/admin/impl/ClustersBase.java | 8 +--
...ces.java => DynamicConfigurationResources.java} | 18 +++----
.../pulsar/broker/admin/impl/PulsarResources.java | 4 +-
.../pulsar/broker/web/PulsarWebResource.java | 29 ++++++++++
.../org/apache/pulsar/broker/admin/AdminTest.java | 8 ---
9 files changed, 77 insertions(+), 105 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3e03297..fc8f309 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -484,7 +484,7 @@ public class PulsarService implements AutoCloseable {
coordinationService = new CoordinationServiceImpl(localMetadataStore);
configurationMetadataStore = createConfigurationMetadataStore();
- pulsarResources = new PulsarResources(configurationMetadataStore);
+ pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore);
orderedExecutor = OrderedExecutor.newBuilder()
.numThreads(config.getNumOrderedExecutorThreads())
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1728c88..453c39b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -23,8 +23,6 @@ import static org.apache.pulsar.common.util.Codec.decode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
-import java.net.MalformedURLException;
-import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -39,7 +37,6 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.UriBuilder;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -417,30 +414,6 @@ public abstract class AdminResource extends PulsarWebResource {
}
}
- /**
- * Redirect the call to the specified broker.
- *
- * @param broker
- * Broker name
- * @throws MalformedURLException
- * In case the redirect happens
- */
- protected void validateBrokerName(String broker) throws MalformedURLException {
- String brokerUrl = String.format("http://%s", broker);
- String brokerUrlTls = String.format("https://%s", broker);
- if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
- && !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
- String[] parts = broker.split(":");
- checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
- String host = parts[0];
- int port = Integer.parseInt(parts[1]);
-
- URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(host).port(port).build();
- log.debug("[{}] Redirecting the rest call to {}: broker={}", clientAppId(), redirect, broker);
- throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
- }
- }
-
protected Policies getNamespacePolicies(NamespaceName namespaceName) {
try {
final String namespace = namespaceName.toString();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
index 07cd9c4..dfbad73 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
@@ -97,13 +97,24 @@ public class BaseResources<T> {
return cache.readModifyUpdate(path, modifyFunction);
}
- public void create(String path, T data) throws MetadataStoreException {
- create(path, t -> data);
+ public void setWithCreate(String path, Function<Optional<T>, T> createFunction) throws MetadataStoreException {
+ try {
+ setWithCreateAsync(path, createFunction).get();
+ } catch (ExecutionException e) {
+ throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
+ : new MetadataStoreException(e.getCause());
+ } catch (Exception e) {
+ throw new MetadataStoreException("Failed to set/create " + path, e);
+ }
}
- public void create(String path, Function<Optional<T>, T> createFunction) throws MetadataStoreException {
+ public CompletableFuture<Void> setWithCreateAsync(String path, Function<Optional<T>, T> createFunction) {
+ return cache.readModifyUpdateOrCreate(path, createFunction);
+ }
+
+ public void create(String path, T data) throws MetadataStoreException {
try {
- createAsync(path, createFunction).get();
+ createAsync(path, data).get();
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
@@ -113,11 +124,7 @@ public class BaseResources<T> {
}
public CompletableFuture<Void> createAsync(String path, T data) {
- return createAsync(path, t -> data);
- }
-
- public CompletableFuture<Void> createAsync(String path, Function<Optional<T>, T> createFunction) {
- return cache.readModifyUpdateOrCreate(path, createFunction);
+ return cache.create(path, data);
}
public void delete(String path) throws MetadataStoreException {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 85549af..25b5149 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -40,14 +41,13 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
-import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService.State;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -57,19 +57,14 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Broker admin base.
*/
-public class BrokersBase extends AdminResource {
+public class BrokersBase extends PulsarWebResource {
private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
- private int serviceConfigZkVersion = -1;
@GET
@Path("/{cluster}")
@@ -90,7 +85,7 @@ public class BrokersBase extends AdminResource {
try {
// Add Native brokers
- return pulsar().getLocalZkCache().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT);
+ return new HashSet<>(dynamicConfigurationResources().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT));
} catch (Exception e) {
LOG.error("[{}] Failed to get active broker list: cluster={}", clientAppId(), cluster, e);
throw new RestException(e);
@@ -134,7 +129,7 @@ public class BrokersBase extends AdminResource {
public void updateDynamicConfiguration(@PathParam("configName") String configName,
@PathParam("configValue") String configValue) throws Exception {
validateSuperUserAccess();
- updateDynamicConfigurationOnZk(configName, configValue);
+ persistDynamicConfiguration(configName, configValue);
}
@DELETE
@@ -159,12 +154,8 @@ public class BrokersBase extends AdminResource {
@ApiResponse(code = 500, message = "Internal server error")})
public Map<String, String> getAllDynamicConfigurations() throws Exception {
validateSuperUserAccess();
-
- ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
- .getDynamicConfigurationCache();
- Map<String, String> configurationMap = null;
try {
- configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
+ return dynamicConfigurationResources().get(BROKER_SERVICE_CONFIGURATION_PATH)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find configuration in zk"));
} catch (RestException e) {
LOG.error("[{}] couldn't find any configuration in zk {}", clientAppId(), e.getMessage(), e);
@@ -173,7 +164,6 @@ public class BrokersBase extends AdminResource {
LOG.error("[{}] Failed to retrieve configuration from zk {}", clientAppId(), e.getMessage(), e);
throw new RestException(e);
}
- return configurationMap;
}
@GET
@@ -204,29 +194,17 @@ public class BrokersBase extends AdminResource {
* @param configValue
* : configuration value
*/
- private synchronized void updateDynamicConfigurationOnZk(String configName, String configValue) {
+ private synchronized void persistDynamicConfiguration(String configName, String configValue) {
try {
if (!BrokerService.validateDynamicConfiguration(configName, configValue)) {
throw new RestException(Status.PRECONDITION_FAILED, " Invalid dynamic-config value");
}
if (BrokerService.isDynamicConfiguration(configName)) {
- ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
- .getDynamicConfigurationCache();
- Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
- .orElse(null);
- if (configurationMap != null) {
- configurationMap.put(configName, configValue);
- byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
- dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
- serviceConfigZkVersion = localZk()
- .setData(BROKER_SERVICE_CONFIGURATION_PATH, content, serviceConfigZkVersion).getVersion();
- } else {
- configurationMap = Maps.newHashMap();
+ dynamicConfigurationResources().setWithCreate(BROKER_SERVICE_CONFIGURATION_PATH, (old) -> {
+ Map<String, String> configurationMap = old.isPresent() ? old.get() : Maps.newHashMap();
configurationMap.put(configName, configValue);
- byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
- ZkUtils.createFullPathOptimistic(localZk(), BROKER_SERVICE_CONFIGURATION_PATH, content,
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
+ return configurationMap;
+ });
LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), configName, configValue);
} else {
if (LOG.isDebugEnabled()) {
@@ -393,17 +371,12 @@ public class BrokersBase extends AdminResource {
private synchronized void deleteDynamicConfigurationOnZk(String configName) {
try {
if (BrokerService.isDynamicConfiguration(configName)) {
- ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
- .getDynamicConfigurationCache();
- Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
- .orElse(null);
- if (configurationMap != null && configurationMap.containsKey(configName)) {
- configurationMap.remove(configName);
- byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
- dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
- serviceConfigZkVersion = localZk()
- .setData(BROKER_SERVICE_CONFIGURATION_PATH, content, serviceConfigZkVersion).getVersion();
- }
+ dynamicConfigurationResources().set(BROKER_SERVICE_CONFIGURATION_PATH, (old) -> {
+ if (old != null) {
+ old.remove(configName);
+ }
+ return old;
+ });
LOG.info("[{}] Deleted Service configuration {}", clientAppId(), configName);
} else {
if (LOG.isDebugEnabled()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index b483114..a52a5c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -685,7 +685,8 @@ public class ClustersBase extends PulsarWebResource {
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
.getPolicies(nsIsolationPolicyPath).orElseGet(() -> {
try {
- namespaceIsolationPolicies().create(nsIsolationPolicyPath, Collections.emptyMap());
+ namespaceIsolationPolicies().setWithCreate(nsIsolationPolicyPath,
+ (p) -> Collections.emptyMap());
return new NamespaceIsolationPolicies();
} catch (Exception e) {
throw new RestException(e);
@@ -835,7 +836,8 @@ public class ClustersBase extends PulsarWebResource {
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
.getPolicies(nsIsolationPolicyPath).orElseGet(() -> {
try {
- namespaceIsolationPolicies().create(nsIsolationPolicyPath, Collections.emptyMap());
+ namespaceIsolationPolicies().setWithCreate(nsIsolationPolicyPath,
+ (p) -> Collections.emptyMap());
return new NamespaceIsolationPolicies();
} catch (Exception e) {
throw new RestException(e);
@@ -893,7 +895,7 @@ public class ClustersBase extends PulsarWebResource {
try {
String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName);
FailureDomainResources failureDomainListCache = clusterResources().getFailureDomainResources();
- failureDomainListCache.create(domainPath, old -> domain);
+ failureDomainListCache.setWithCreate(domainPath, old -> domain);
} catch (NotFoundException nne) {
log.warn("[{}] Failed to update domain {}. clusters {} Does not exist", clientAppId(), cluster,
domainName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/DynamicConfigurationResources.java
similarity index 62%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/DynamicConfigurationResources.java
index 4384762..99c8d3c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/DynamicConfigurationResources.java
@@ -18,20 +18,14 @@
*/
package org.apache.pulsar.broker.admin.impl;
-import lombok.AccessLevel;
-import lombok.Getter;
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.util.Map;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-@Getter(AccessLevel.PUBLIC)
-public class PulsarResources {
+public class DynamicConfigurationResources extends BaseResources<Map<String, String>> {
- private TenantResources tenatResources;
- private ClusterResources clusterResources;
- private NamespaceResources namespaceResources;
-
- public PulsarResources(MetadataStoreExtended configurationMetadataStore) {
- tenatResources = new TenantResources(configurationMetadataStore);
- clusterResources = new ClusterResources(configurationMetadataStore);
- namespaceResources = new NamespaceResources(configurationMetadataStore);
+ public DynamicConfigurationResources(MetadataStoreExtended store) {
+ super(store, new TypeReference<Map<String, String>>(){});
}
+
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
index 4384762..41dd53b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
@@ -28,10 +28,12 @@ public class PulsarResources {
private TenantResources tenatResources;
private ClusterResources clusterResources;
private NamespaceResources namespaceResources;
+ private DynamicConfigurationResources dynamicConfigResources;
- public PulsarResources(MetadataStoreExtended configurationMetadataStore) {
+ public PulsarResources(MetadataStoreExtended localMetadataStore, MetadataStoreExtended configurationMetadataStore) {
tenatResources = new TenantResources(configurationMetadataStore);
clusterResources = new ClusterResources(configurationMetadataStore);
namespaceResources = new NamespaceResources(configurationMetadataStore);
+ dynamicConfigResources = new DynamicConfigurationResources(localMetadataStore);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 1a307a8..141cae9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.impl.ClusterResources;
+import org.apache.pulsar.broker.admin.impl.DynamicConfigurationResources;
import org.apache.pulsar.broker.admin.impl.NamespaceResources;
import org.apache.pulsar.broker.admin.impl.NamespaceResources.IsolationPolicyResources;
import org.apache.pulsar.broker.admin.impl.TenantResources;
@@ -879,6 +880,10 @@ public abstract class PulsarWebResource {
return namespaceResources().getIsolationPolicies();
}
+ protected DynamicConfigurationResources dynamicConfigurationResources() {
+ return pulsar().getPulsarResources().getDynamicConfigResources();
+ }
+
public static ObjectMapper jsonMapper() {
return ObjectMapperFactory.getThreadLocal();
}
@@ -992,4 +997,28 @@ public abstract class PulsarWebResource {
return activeNamespaceFuture.isEmpty() ? CompletableFuture.completedFuture(null)
: FutureUtil.waitForAll(activeNamespaceFuture);
}
+
+ /**
+ * Redirect the call to the specified broker.
+ *
+ * @param broker
+ * Broker name
+ * @throws MalformedURLException
+ * In case the redirect happens
+ */
+ protected void validateBrokerName(String broker) throws MalformedURLException {
+ String brokerUrl = String.format("http://%s", broker);
+ String brokerUrlTls = String.format("https://%s", broker);
+ if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
+ && !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
+ String[] parts = broker.split(":");
+ checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
+ String host = parts[0];
+ int port = Integer.parseInt(parts[1]);
+
+ URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(host).port(port).build();
+ log.debug("[{}] Redirecting the rest call to {}: broker={}", clientAppId(), redirect, broker);
+ throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index f51eed1..ee157e7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -134,10 +134,6 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
clusters = spy(new Clusters());
clusters.setPulsar(pulsar);
- /*doReturn(mockZooKeeperGlobal).when(clusters).globalZk();
- doReturn(configurationCache.clustersCache()).when(clusters).clustersCache();
- doReturn(configurationCache.clustersListCache()).when(clusters).clustersListCache();
- doReturn(configurationCache.namespaceIsolationPoliciesCache()).when(clusters).namespaceIsolationPoliciesCache();*/
doReturn("test").when(clusters).clientAppId();
doNothing().when(clusters).validateSuperUserAccess();
@@ -160,11 +156,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
doNothing().when(namespaces).validateAdminAccessForTenant("new-property");
brokers = spy(new Brokers());
- brokers.setServletContext(new MockServletContext());
brokers.setPulsar(pulsar);
- doReturn(mockZooKeeperGlobal).when(brokers).globalZk();
- doReturn(mockZooKeeper).when(brokers).localZk();
- doReturn(configurationCache.clustersListCache()).when(brokers).clustersListCache();
doReturn("test").when(brokers).clientAppId();
doNothing().when(brokers).validateSuperUserAccess();