You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/03 13:50:21 UTC
[pulsar] branch branch-2.8 updated: [Broker] Fix and improve topic ownership assignment (#13069) (#13117)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 3f3098c [Broker] Fix and improve topic ownership assignment (#13069) (#13117)
3f3098c is described below
commit 3f3098cfce236e986c86f1ab36694efce0a48c06
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Dec 3 15:49:20 2021 +0200
[Broker] Fix and improve topic ownership assignment (#13069) (#13117)
---
pulsar-broker/pom.xml | 8 ++
.../pulsar/broker/namespace/NamespaceService.java | 104 ++++++++++++-------
.../common/naming/NamespaceBundleFactory.java | 33 +++++-
.../apache/pulsar/broker/MultiBrokerBaseTest.java | 6 +-
.../broker/auth/MockedPulsarServiceBaseTest.java | 39 ++++---
.../loadbalance/MultiBrokerLeaderElectionTest.java | 112 +++++++++++++++++++++
pulsar-metadata/pom.xml | 11 ++
7 files changed, 258 insertions(+), 55 deletions(-)
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 2a4bb82..aa5b7a9 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -561,6 +561,14 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-metadata</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</profile>
<profile>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index f167fac..4fff4b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -461,6 +461,9 @@ public class NamespaceService implements AutoCloseable {
// The leader election service was not initialized yet. This can happen because the broker service is
// initialized first and it might start receiving lookup requests before the leader election service is
// fully initialized.
+ LOG.warn("Leader election service isn't initialized yet. "
+ + "Returning empty result to lookup. NamespaceBundle[{}]",
+ bundle);
lookupFuture.complete(Optional.empty());
return;
}
@@ -484,23 +487,45 @@ public class NamespaceService implements AutoCloseable {
if (options.isAuthoritative()) {
// leader broker already assigned the current broker as owner
candidateBroker = pulsar.getSafeWebServiceAddress();
- } else if (!this.loadManager.get().isCentralized()
- || pulsar.getLeaderElectionService().isLeader()
- || !currentLeader.isPresent()
-
+ } else {
+ LoadManager loadManager = this.loadManager.get();
+ boolean makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader();
+ if (!makeLoadManagerDecisionOnThisBroker) {
// If leader is not active, fallback to pick the least loaded from current broker loadmanager
- || !isBrokerActive(currentLeader.get().getServiceUrl())
- ) {
- Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
- if (!availableBroker.isPresent()) {
- lookupFuture.complete(Optional.empty());
- return;
+ boolean leaderBrokerActive = currentLeader.isPresent()
+ && isBrokerActive(currentLeader.get().getServiceUrl());
+ if (!leaderBrokerActive) {
+ makeLoadManagerDecisionOnThisBroker = true;
+ if (!currentLeader.isPresent()) {
+ LOG.warn(
+ "The information about the current leader broker wasn't available. "
+ + "Handling load manager decisions in a decentralized way. "
+ + "NamespaceBundle[{}]",
+ bundle);
+ } else {
+ LOG.warn(
+ "The current leader broker {} isn't active. "
+ + "Handling load manager decisions in a decentralized way. "
+ + "NamespaceBundle[{}]",
+ currentLeader.get(), bundle);
+ }
+ }
+ }
+ if (makeLoadManagerDecisionOnThisBroker) {
+ Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
+ if (!availableBroker.isPresent()) {
+ LOG.warn("Load manager didn't return any available broker. "
+ + "Returning empty result to lookup. NamespaceBundle[{}]",
+ bundle);
+ lookupFuture.complete(Optional.empty());
+ return;
+ }
+ candidateBroker = availableBroker.get();
+ authoritativeRedirect = true;
+ } else {
+ // forward to leader broker to make assignment
+ candidateBroker = currentLeader.get().getServiceUrl();
}
- candidateBroker = availableBroker.get();
- authoritativeRedirect = true;
- } else {
- // forward to leader broker to make assignment
- candidateBroker = currentLeader.get().getServiceUrl();
}
}
} catch (Exception e) {
@@ -583,19 +608,16 @@ public class NamespaceService implements AutoCloseable {
}
protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
- final String advertisedListenerName)
- throws Exception {
+ final String advertisedListenerName) {
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
try {
- checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null " + candidateBroker);
- URI uri = new URI(candidateBroker);
- String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(),
- uri.getPort());
+ checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null %s", candidateBroker);
+ String path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + parseHostAndPort(candidateBroker);
localBrokerDataCache.get(path).thenAccept(reportData -> {
if (reportData.isPresent()) {
- LocalBrokerData lookupData = (LocalBrokerData) reportData.get();
+ LocalBrokerData lookupData = reportData.get();
if (StringUtils.isNotBlank(advertisedListenerName)) {
AdvertisedListener listener = lookupData.getAdvertisedListeners().get(advertisedListenerName);
if (listener == null) {
@@ -627,22 +649,36 @@ public class NamespaceService implements AutoCloseable {
}
private boolean isBrokerActive(String candidateBroker) {
- List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
- for (String brokerHostPort : brokers) {
- if (candidateBroker.equals("http://" + brokerHostPort)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
- }
- return true;
+ String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
+ Set<String> availableBrokers = getAvailableBrokers();
+ if (availableBrokers.contains(candidateBrokerHostAndPort)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Broker {} ({}) is available for.", candidateBroker, candidateBrokerHostAndPort);
}
+ return true;
+ } else {
+ LOG.warn("Broker {} ({}) couldn't be found in available brokers {}",
+ candidateBroker, candidateBrokerHostAndPort,
+ availableBrokers.stream().collect(Collectors.joining(",")));
+ return false;
}
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Broker not found for SLA Monitoring Namespace {}",
- candidateBroker + ":" + config.getWebServicePort());
+ private static String parseHostAndPort(String candidateBroker) {
+ int uriSeparatorPos = candidateBroker.indexOf("://");
+ if (uriSeparatorPos == -1) {
+ throw new IllegalArgumentException("'" + candidateBroker + "' isn't an URI.");
+ }
+ String candidateBrokerHostAndPort = candidateBroker.substring(uriSeparatorPos + 3);
+ return candidateBrokerHostAndPort;
+ }
+
+ private Set<String> getAvailableBrokers() {
+ try {
+ return loadManager.get().getAvailableBrokers();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- return false;
}
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index 3e30148..077b1f4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -33,16 +33,19 @@ import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.hash.HashFunction;
import java.io.IOException;
+import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
@@ -63,6 +66,7 @@ public class NamespaceBundleFactory {
private final PulsarService pulsar;
private final MetadataCache<Policies> policiesCache;
+ private final Duration maxRetryDuration = Duration.ofSeconds(10);
public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
this.hashFunc = hashFunc;
@@ -90,22 +94,27 @@ public class NamespaceBundleFactory {
}
CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
+ doLoadBundles(namespace, path, future, createBackoff(), System.nanoTime() + maxRetryDuration.toNanos());
+ return future;
+ }
+
+ private void doLoadBundles(NamespaceName namespace, String path, CompletableFuture<NamespaceBundles> future,
+ Backoff backoff, long retryDeadline) {
// Read the static bundle data from the policies
pulsar.getLocalMetadataStore().get(path).thenAccept(result -> {
-
if (result.isPresent()) {
try {
future.complete(readBundles(namespace,
result.get().getValue(), result.get().getStat().getVersion()));
} catch (IOException e) {
- future.completeExceptionally(e);
+ handleLoadBundlesRetry(namespace, path, future, backoff, retryDeadline, e);
}
} else {
// If no local policies defined for namespace, copy from global config
copyToLocalPolicies(namespace)
.thenAccept(b -> future.complete(b))
.exceptionally(ex -> {
- future.completeExceptionally(ex);
+ handleLoadBundlesRetry(namespace, path, future, backoff, retryDeadline, ex);
return null;
});
}
@@ -113,7 +122,23 @@ public class NamespaceBundleFactory {
future.completeExceptionally(ex);
return null;
});
- return future;
+ }
+
+ private void handleLoadBundlesRetry(NamespaceName namespace, String path,
+ CompletableFuture<NamespaceBundles> future,
+ Backoff backoff, long retryDeadline, Throwable e) {
+ if (e instanceof Error || System.nanoTime() > retryDeadline) {
+ future.completeExceptionally(e);
+ } else {
+ LOG.warn("Error loading bundle for {} from path {}. Retrying exception", namespace, path, e);
+ long retryDelay = backoff.next();
+ pulsar.getExecutor().schedule(() ->
+ doLoadBundles(namespace, path, future, backoff, retryDeadline), retryDelay, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private static Backoff createBackoff() {
+ return new Backoff(100, TimeUnit.MILLISECONDS, 5, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
}
private NamespaceBundles readBundles(NamespaceName namespace, byte[] value, long version) throws IOException {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
index f4d106d..c00ae8c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
@@ -27,6 +27,8 @@ import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.MockZooKeeperSession;
import org.testng.annotations.AfterClass;
@@ -82,13 +84,13 @@ public abstract class MultiBrokerBaseTest extends MockedPulsarServiceBaseTest {
}
@Override
- protected ZKMetadataStore createLocalMetadataStore() {
+ protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
// use MockZooKeeperSession to provide a unique session id for each instance
return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeper));
}
@Override
- protected ZKMetadataStore createConfigurationMetadataStore() {
+ protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
// use MockZooKeeperSession to provide a unique session id for each instance
return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeperGlobal));
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 98b2dab..0726a5d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -59,6 +59,8 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -232,6 +234,11 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
}
bkExecutor = null;
}
+ onCleanup();
+ }
+
+ protected void onCleanup() {
+
}
protected abstract void setup() throws Exception;
@@ -304,7 +311,7 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
// Override default providers with mocked ones
- doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
+ doReturn(createZooKeeperClientFactory()).when(pulsar).getZooKeeperClientFactory();
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
doReturn(createLocalMetadataStore()).when(pulsar).createLocalMetadataStore();
doReturn(createConfigurationMetadataStore()).when(pulsar).createConfigurationMetadataStore();
@@ -321,11 +328,11 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
}
}
- protected ZKMetadataStore createLocalMetadataStore() {
+ protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
return new ZKMetadataStore(mockZooKeeper);
}
- protected ZKMetadataStore createConfigurationMetadataStore() {
+ protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
return new ZKMetadataStore(mockZooKeeperGlobal);
}
@@ -398,21 +405,23 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
}
}
- protected ZooKeeperClientFactory mockZooKeeperClientFactory = new ZooKeeperClientFactory() {
+ protected ZooKeeperClientFactory createZooKeeperClientFactory() {
+ return new ZooKeeperClientFactory() {
- @Override
- public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
- int zkSessionTimeoutMillis) {
+ @Override
+ public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
+ int zkSessionTimeoutMillis) {
- if (serverList != null
- && (serverList.equalsIgnoreCase(conf.getConfigurationStoreServers())
- || serverList.equalsIgnoreCase(GLOBAL_DUMMY_VALUE))) {
- return CompletableFuture.completedFuture(mockZooKeeperGlobal);
- }
+ if (serverList != null
+ && (serverList.equalsIgnoreCase(conf.getConfigurationStoreServers())
+ || serverList.equalsIgnoreCase(GLOBAL_DUMMY_VALUE))) {
+ return CompletableFuture.completedFuture(mockZooKeeperGlobal);
+ }
- return CompletableFuture.completedFuture(mockZooKeeper);
- }
- };
+ return CompletableFuture.completedFuture(mockZooKeeper);
+ }
+ };
+ }
private final BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
index 0045ddd..230eecd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
@@ -20,15 +20,83 @@ package org.apache.pulsar.broker.loadbalance;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.metadata.TestZKServer;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
+import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;
+@Slf4j
@Test(groups = "broker")
public class MultiBrokerLeaderElectionTest extends MultiBrokerBaseTest {
+ @Override
+ protected int numberOfAdditionalBrokers() {
+ return 9;
+ }
+
+ TestZKServer testZKServer;
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ testZKServer = new TestZKServer();
+ }
+
+ @Override
+ protected void onCleanup() {
+ super.onCleanup();
+ if (testZKServer != null) {
+ try {
+ testZKServer.close();
+ } catch (Exception e) {
+ log.error("Error in stopping ZK server", e);
+ }
+ }
+ }
+
+ @Override
+ protected ZooKeeperClientFactory createZooKeeperClientFactory() {
+ return new ZookeeperClientFactoryImpl() {
+ @Override
+ public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
+ int zkSessionTimeoutMillis) {
+ return super.create(testZKServer.getConnectionString(), sessionType, zkSessionTimeoutMillis);
+ }
+ };
+ }
+
+ @Override
+ protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
+ return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
+ }
+
+ @Override
+ protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
+ return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
+ }
@Test
public void shouldElectOneLeader() {
@@ -68,4 +136,48 @@ public class MultiBrokerLeaderElectionTest extends MultiBrokerBaseTest {
}
});
}
+
+ @Test
+ public void shouldProvideConsistentAnswerToTopicLookups()
+ throws PulsarAdminException, ExecutionException, InterruptedException {
+ String topicNameBase = "persistent://public/default/lookuptest" + UUID.randomUUID() + "-";
+ List<String> topicNames = IntStream.range(0, 500).mapToObj(i -> topicNameBase + i)
+ .collect(Collectors.toList());
+ List<PulsarAdmin> allAdmins = getAllAdmins();
+ @Cleanup("shutdown")
+ ExecutorService executorService = Executors.newFixedThreadPool(allAdmins.size());
+ List<Future<List<String>>> resultFutures = new ArrayList<>();
+ String leaderBrokerUrl = admin.brokers().getLeaderBroker().getServiceUrl();
+ log.info("LEADER is {}", leaderBrokerUrl);
+ // use Phaser to increase the chances of a race condition by triggering all threads once
+ // they are waiting just before the lookupTopic call
+ final Phaser phaser = new Phaser(1);
+ for (PulsarAdmin brokerAdmin : allAdmins) {
+ if (!leaderBrokerUrl.equals(brokerAdmin.getServiceUrl())) {
+ phaser.register();
+ log.info("Doing lookup to broker {}", brokerAdmin.getServiceUrl());
+ resultFutures.add(executorService.submit(() -> {
+ phaser.arriveAndAwaitAdvance();
+ return topicNames.stream().map(topicName -> {
+ try {
+ return brokerAdmin.lookups().lookupTopic(topicName);
+ } catch (PulsarAdminException e) {
+ log.error("Error looking up topic {} in {}", topicName, brokerAdmin.getServiceUrl());
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ }));
+ }
+ }
+ phaser.arriveAndAwaitAdvance();
+ List<String> firstResult = null;
+ for (Future<List<String>> resultFuture : resultFutures) {
+ List<String> result = resultFuture.get();
+ if (firstResult == null) {
+ firstResult = result;
+ } else {
+ assertEquals(result, firstResult, "The lookup results weren't consistent.");
+ }
+ }
+ }
}
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index 7e19b00..f40891f 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -83,6 +83,17 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-maven-plugin</artifactId>
<version>${spotbugs-maven-plugin.version}</version>