You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/03 13:50:21 UTC

[pulsar] branch branch-2.8 updated: [Broker] Fix and improve topic ownership assignment (#13069) (#13117)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 3f3098c  [Broker] Fix and improve topic ownership assignment (#13069) (#13117)
3f3098c is described below

commit 3f3098cfce236e986c86f1ab36694efce0a48c06
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Dec 3 15:49:20 2021 +0200

    [Broker] Fix and improve topic ownership assignment (#13069) (#13117)
---
 pulsar-broker/pom.xml                              |   8 ++
 .../pulsar/broker/namespace/NamespaceService.java  | 104 ++++++++++++-------
 .../common/naming/NamespaceBundleFactory.java      |  33 +++++-
 .../apache/pulsar/broker/MultiBrokerBaseTest.java  |   6 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  39 ++++---
 .../loadbalance/MultiBrokerLeaderElectionTest.java | 112 +++++++++++++++++++++
 pulsar-metadata/pom.xml                            |  11 ++
 7 files changed, 258 insertions(+), 55 deletions(-)

diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 2a4bb82..aa5b7a9 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -561,6 +561,14 @@
           <type>test-jar</type>
           <scope>test</scope>
         </dependency>
+
+        <dependency>
+          <groupId>${project.groupId}</groupId>
+          <artifactId>pulsar-metadata</artifactId>
+          <version>${project.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>
       </dependencies>
     </profile>
     <profile>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index f167fac..4fff4b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -461,6 +461,9 @@ public class NamespaceService implements AutoCloseable {
             // The leader election service was not initialized yet. This can happen because the broker service is
             // initialized first and it might start receiving lookup requests before the leader election service is
             // fully initialized.
+            LOG.warn("Leader election service isn't initialized yet. "
+                            + "Returning empty result to lookup. NamespaceBundle[{}]",
+                    bundle);
             lookupFuture.complete(Optional.empty());
             return;
         }
@@ -484,23 +487,45 @@ public class NamespaceService implements AutoCloseable {
                 if (options.isAuthoritative()) {
                     // leader broker already assigned the current broker as owner
                     candidateBroker = pulsar.getSafeWebServiceAddress();
-                } else if (!this.loadManager.get().isCentralized()
-                        || pulsar.getLeaderElectionService().isLeader()
-                        || !currentLeader.isPresent()
-
+                } else {
+                    LoadManager loadManager = this.loadManager.get();
+                    boolean makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader();
+                    if (!makeLoadManagerDecisionOnThisBroker) {
                         // If leader is not active, fallback to pick the least loaded from current broker loadmanager
-                        || !isBrokerActive(currentLeader.get().getServiceUrl())
-                ) {
-                    Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
-                    if (!availableBroker.isPresent()) {
-                        lookupFuture.complete(Optional.empty());
-                        return;
+                        boolean leaderBrokerActive = currentLeader.isPresent()
+                                && isBrokerActive(currentLeader.get().getServiceUrl());
+                        if (!leaderBrokerActive) {
+                            makeLoadManagerDecisionOnThisBroker = true;
+                            if (!currentLeader.isPresent()) {
+                                LOG.warn(
+                                        "The information about the current leader broker wasn't available. "
+                                                + "Handling load manager decisions in a decentralized way. "
+                                                + "NamespaceBundle[{}]",
+                                        bundle);
+                            } else {
+                                LOG.warn(
+                                        "The current leader broker {} isn't active. "
+                                                + "Handling load manager decisions in a decentralized way. "
+                                                + "NamespaceBundle[{}]",
+                                        currentLeader.get(), bundle);
+                            }
+                        }
+                    }
+                    if (makeLoadManagerDecisionOnThisBroker) {
+                        Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
+                        if (!availableBroker.isPresent()) {
+                            LOG.warn("Load manager didn't return any available broker. "
+                                            + "Returning empty result to lookup. NamespaceBundle[{}]",
+                                    bundle);
+                            lookupFuture.complete(Optional.empty());
+                            return;
+                        }
+                        candidateBroker = availableBroker.get();
+                        authoritativeRedirect = true;
+                    } else {
+                        // forward to leader broker to make assignment
+                        candidateBroker = currentLeader.get().getServiceUrl();
                     }
-                    candidateBroker = availableBroker.get();
-                    authoritativeRedirect = true;
-                } else {
-                    // forward to leader broker to make assignment
-                    candidateBroker = currentLeader.get().getServiceUrl();
                 }
             }
         } catch (Exception e) {
@@ -583,19 +608,16 @@ public class NamespaceService implements AutoCloseable {
     }
 
     protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
-                                                                 final String advertisedListenerName)
-            throws Exception {
+                                                                 final String advertisedListenerName) {
 
         CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
         try {
-            checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null " + candidateBroker);
-            URI uri = new URI(candidateBroker);
-            String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(),
-                    uri.getPort());
+            checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null %s", candidateBroker);
+            String path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + parseHostAndPort(candidateBroker);
 
             localBrokerDataCache.get(path).thenAccept(reportData -> {
                 if (reportData.isPresent()) {
-                    LocalBrokerData lookupData = (LocalBrokerData) reportData.get();
+                    LocalBrokerData lookupData = reportData.get();
                     if (StringUtils.isNotBlank(advertisedListenerName)) {
                         AdvertisedListener listener = lookupData.getAdvertisedListeners().get(advertisedListenerName);
                         if (listener == null) {
@@ -627,22 +649,36 @@ public class NamespaceService implements AutoCloseable {
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
-                }
-                return true;
+        String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
+        Set<String> availableBrokers = getAvailableBrokers();
+        if (availableBrokers.contains(candidateBrokerHostAndPort)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Broker {} ({}) is available for.", candidateBroker, candidateBrokerHostAndPort);
             }
+            return true;
+        } else {
+            LOG.warn("Broker {} ({}) couldn't be found in available brokers {}",
+                    candidateBroker, candidateBrokerHostAndPort,
+                    availableBrokers.stream().collect(Collectors.joining(",")));
+            return false;
         }
+    }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Broker not found for SLA Monitoring Namespace {}",
-                    candidateBroker + ":" + config.getWebServicePort());
+    private static String parseHostAndPort(String candidateBroker) {
+        int uriSeparatorPos = candidateBroker.indexOf("://");
+        if (uriSeparatorPos == -1) {
+            throw new IllegalArgumentException("'" + candidateBroker + "' isn't an URI.");
+        }
+        String candidateBrokerHostAndPort = candidateBroker.substring(uriSeparatorPos + 3);
+        return candidateBrokerHostAndPort;
+    }
+
+    private Set<String> getAvailableBrokers() {
+        try {
+            return loadManager.get().getAvailableBrokers();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
-        return false;
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index 3e30148..077b1f4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -33,16 +33,19 @@ import com.google.common.collect.BoundType;
 import com.google.common.collect.Range;
 import com.google.common.hash.HashFunction;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.SortedSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -63,6 +66,7 @@ public class NamespaceBundleFactory {
 
     private final PulsarService pulsar;
     private final MetadataCache<Policies> policiesCache;
+    private final Duration maxRetryDuration = Duration.ofSeconds(10);
 
     public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
         this.hashFunc = hashFunc;
@@ -90,22 +94,27 @@ public class NamespaceBundleFactory {
         }
 
         CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
+        doLoadBundles(namespace, path, future, createBackoff(), System.nanoTime() + maxRetryDuration.toNanos());
+        return future;
+    }
+
+    private void doLoadBundles(NamespaceName namespace, String path, CompletableFuture<NamespaceBundles> future,
+                               Backoff backoff, long retryDeadline) {
         // Read the static bundle data from the policies
         pulsar.getLocalMetadataStore().get(path).thenAccept(result -> {
-
             if (result.isPresent()) {
                 try {
                     future.complete(readBundles(namespace,
                             result.get().getValue(), result.get().getStat().getVersion()));
                 } catch (IOException e) {
-                    future.completeExceptionally(e);
+                    handleLoadBundlesRetry(namespace, path, future, backoff, retryDeadline, e);
                 }
             } else {
                 // If no local policies defined for namespace, copy from global config
                 copyToLocalPolicies(namespace)
                         .thenAccept(b -> future.complete(b))
                         .exceptionally(ex -> {
-                            future.completeExceptionally(ex);
+                            handleLoadBundlesRetry(namespace, path, future, backoff, retryDeadline, ex);
                             return null;
                         });
             }
@@ -113,7 +122,23 @@ public class NamespaceBundleFactory {
             future.completeExceptionally(ex);
             return null;
         });
-        return future;
+    }
+
+    private void handleLoadBundlesRetry(NamespaceName namespace, String path,
+                                        CompletableFuture<NamespaceBundles> future,
+                                        Backoff backoff, long retryDeadline, Throwable e) {
+        if (e instanceof Error || System.nanoTime() > retryDeadline) {
+            future.completeExceptionally(e);
+        } else {
+            LOG.warn("Error loading bundle for {} from path {}. Retrying exception", namespace, path, e);
+            long retryDelay = backoff.next();
+            pulsar.getExecutor().schedule(() ->
+                    doLoadBundles(namespace, path, future, backoff, retryDeadline), retryDelay, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private static Backoff createBackoff() {
+        return new Backoff(100, TimeUnit.MILLISECONDS, 5, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
     }
 
     private NamespaceBundles readBundles(NamespaceName namespace, byte[] value, long version) throws IOException {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
index f4d106d..c00ae8c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
@@ -27,6 +27,8 @@ import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.zookeeper.MockZooKeeperSession;
 import org.testng.annotations.AfterClass;
@@ -82,13 +84,13 @@ public abstract class MultiBrokerBaseTest extends MockedPulsarServiceBaseTest {
     }
 
     @Override
-    protected ZKMetadataStore createLocalMetadataStore() {
+    protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
         // use MockZooKeeperSession to provide a unique session id for each instance
         return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeper));
     }
 
     @Override
-    protected ZKMetadataStore createConfigurationMetadataStore() {
+    protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
         // use MockZooKeeperSession to provide a unique session id for each instance
         return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeperGlobal));
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 98b2dab..0726a5d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -59,6 +59,8 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.pulsar.tests.TestRetrySupport;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -232,6 +234,11 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
             }
             bkExecutor = null;
         }
+        onCleanup();
+    }
+
+    protected void onCleanup() {
+
     }
 
     protected abstract void setup() throws Exception;
@@ -304,7 +311,7 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
 
     protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
         // Override default providers with mocked ones
-        doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
+        doReturn(createZooKeeperClientFactory()).when(pulsar).getZooKeeperClientFactory();
         doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
         doReturn(createLocalMetadataStore()).when(pulsar).createLocalMetadataStore();
         doReturn(createConfigurationMetadataStore()).when(pulsar).createConfigurationMetadataStore();
@@ -321,11 +328,11 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
         }
     }
 
-    protected ZKMetadataStore createLocalMetadataStore() {
+    protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
         return new ZKMetadataStore(mockZooKeeper);
     }
 
-    protected ZKMetadataStore createConfigurationMetadataStore() {
+    protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
         return new ZKMetadataStore(mockZooKeeperGlobal);
     }
 
@@ -398,21 +405,23 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
         }
     }
 
-    protected ZooKeeperClientFactory mockZooKeeperClientFactory = new ZooKeeperClientFactory() {
+    protected ZooKeeperClientFactory createZooKeeperClientFactory() {
+        return new ZooKeeperClientFactory() {
 
-        @Override
-        public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
-                int zkSessionTimeoutMillis) {
+            @Override
+            public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
+                                                       int zkSessionTimeoutMillis) {
 
-            if (serverList != null
-                    && (serverList.equalsIgnoreCase(conf.getConfigurationStoreServers())
-                    || serverList.equalsIgnoreCase(GLOBAL_DUMMY_VALUE))) {
-                return CompletableFuture.completedFuture(mockZooKeeperGlobal);
-            }
+                if (serverList != null
+                        && (serverList.equalsIgnoreCase(conf.getConfigurationStoreServers())
+                        || serverList.equalsIgnoreCase(GLOBAL_DUMMY_VALUE))) {
+                    return CompletableFuture.completedFuture(mockZooKeeperGlobal);
+                }
 
-            return CompletableFuture.completedFuture(mockZooKeeper);
-        }
-    };
+                return CompletableFuture.completedFuture(mockZooKeeper);
+            }
+        };
+    }
 
     private final BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() {
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
index 0045ddd..230eecd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
@@ -20,15 +20,83 @@ package org.apache.pulsar.broker.loadbalance;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.MultiBrokerBaseTest;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.metadata.TestZKServer;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
+import org.apache.zookeeper.ZooKeeper;
 import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
+@Slf4j
 @Test(groups = "broker")
 public class MultiBrokerLeaderElectionTest extends MultiBrokerBaseTest {
+    @Override
+    protected int numberOfAdditionalBrokers() {
+        return 9;
+    }
+
+    TestZKServer testZKServer;
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        testZKServer = new TestZKServer();
+    }
+
+    @Override
+    protected void onCleanup() {
+        super.onCleanup();
+        if (testZKServer != null) {
+            try {
+                testZKServer.close();
+            } catch (Exception e) {
+                log.error("Error in stopping ZK server", e);
+            }
+        }
+    }
+
+    @Override
+    protected ZooKeeperClientFactory createZooKeeperClientFactory() {
+        return new ZookeeperClientFactoryImpl() {
+            @Override
+            public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
+                                                       int zkSessionTimeoutMillis) {
+                return super.create(testZKServer.getConnectionString(), sessionType, zkSessionTimeoutMillis);
+            }
+        };
+    }
+
+    @Override
+    protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
+        return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
+    }
+
+    @Override
+    protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
+        return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
+    }
 
     @Test
     public void shouldElectOneLeader() {
@@ -68,4 +136,48 @@ public class MultiBrokerLeaderElectionTest extends MultiBrokerBaseTest {
             }
         });
     }
+
+    @Test
+    public void shouldProvideConsistentAnswerToTopicLookups()
+            throws PulsarAdminException, ExecutionException, InterruptedException {
+        String topicNameBase = "persistent://public/default/lookuptest" + UUID.randomUUID() + "-";
+        List<String> topicNames = IntStream.range(0, 500).mapToObj(i -> topicNameBase + i)
+                .collect(Collectors.toList());
+        List<PulsarAdmin> allAdmins = getAllAdmins();
+        @Cleanup("shutdown")
+        ExecutorService executorService = Executors.newFixedThreadPool(allAdmins.size());
+        List<Future<List<String>>> resultFutures = new ArrayList<>();
+        String leaderBrokerUrl = admin.brokers().getLeaderBroker().getServiceUrl();
+        log.info("LEADER is {}", leaderBrokerUrl);
+        // use Phaser to increase the chances of a race condition by triggering all threads once
+        // they are waiting just before the lookupTopic call
+        final Phaser phaser = new Phaser(1);
+        for (PulsarAdmin brokerAdmin : allAdmins) {
+            if (!leaderBrokerUrl.equals(brokerAdmin.getServiceUrl())) {
+                phaser.register();
+                log.info("Doing lookup to broker {}", brokerAdmin.getServiceUrl());
+                resultFutures.add(executorService.submit(() -> {
+                    phaser.arriveAndAwaitAdvance();
+                    return topicNames.stream().map(topicName -> {
+                        try {
+                            return brokerAdmin.lookups().lookupTopic(topicName);
+                        } catch (PulsarAdminException e) {
+                            log.error("Error looking up topic {} in {}", topicName, brokerAdmin.getServiceUrl());
+                            throw new RuntimeException(e);
+                        }
+                    }).collect(Collectors.toList());
+                }));
+            }
+        }
+        phaser.arriveAndAwaitAdvance();
+        List<String> firstResult = null;
+        for (Future<List<String>> resultFuture : resultFutures) {
+            List<String> result = resultFuture.get();
+            if (firstResult == null) {
+                firstResult = result;
+            } else {
+                assertEquals(result, firstResult, "The lookup results weren't consistent.");
+            }
+        }
+    }
 }
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index 7e19b00..f40891f 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -83,6 +83,17 @@
   <build>
     <plugins>
       <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
         <groupId>com.github.spotbugs</groupId>
         <artifactId>spotbugs-maven-plugin</artifactId>
         <version>${spotbugs-maven-plugin.version}</version>