You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/27 21:25:12 UTC

[GitHub] merlimat closed pull request #1428: Issue #1117: handle race in concurrent bundle split

merlimat closed pull request #1428: Issue #1117: handle race in concurrent bundle split
URL: https://github.com/apache/incubator-pulsar/pull/1428
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
index 66a1ffaab..4b28cad8b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
@@ -22,6 +22,8 @@
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
 import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
 
+import com.google.common.collect.Maps;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
@@ -38,6 +40,7 @@
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,20 +82,28 @@ public LocalPolicies deserialize(String path, byte[] content) throws Exception {
 
             @Override
             public CompletableFuture<Optional<LocalPolicies>> getAsync(String path) {
-                CompletableFuture<Optional<LocalPolicies>> future = new CompletableFuture<>();
+                return getWithStatAsync(path).thenApply(entry -> entry.map(e -> e.getKey()));
+            }
+
+            @Override
+            public CompletableFuture<Optional<Entry<LocalPolicies, Stat>>> getWithStatAsync(String path) {
+                CompletableFuture<Optional<Entry<LocalPolicies, Stat>>> future = new CompletableFuture<>();
 
                 // First check in local-zk cache
-                super.getAsync(path).thenAccept(localPolicies -> {
+                super.getWithStatAsync(path).thenAccept(result -> {
+                    Optional<LocalPolicies> localPolicies = result.map(Entry::getKey);
                     if (localPolicies.isPresent()) {
-                        future.complete(localPolicies);
+                        future.complete(result);
                     } else {
                         // create new policies node under Local ZK by coping it from Global ZK
                         createPolicies(path, true).thenAccept(p -> {
                             LOG.info("Successfully created local policies for {} -- {}", path, p);
                             // local-policies have been created but it's not part of policiesCache. so, call
                             // super.getAsync() which will load it and set the watch on local-policies path
-                            super.getAsync(path);
-                            future.complete(p);
+                            super.getWithStatAsync(path);
+                            Stat stat = new Stat();
+                            stat.setVersion(-1);
+                            future.complete(Optional.of(Maps.immutableEntry(p.orElse(null), stat)));
                         }).exceptionally(ex -> {
                             future.completeExceptionally(ex);
                             return null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 594b71762..fe307136f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -22,7 +22,6 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
 import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
 import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
@@ -37,12 +36,12 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -52,6 +51,7 @@
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.lookup.data.LookupData;
@@ -72,6 +72,7 @@
 import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,6 +112,8 @@
 
     private final String host;
 
+    private static final int BUNDLE_SPLIT_RETRY_LIMIT = 7;
+
     public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
     public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
     public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
@@ -546,87 +549,141 @@ public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exceptio
 
     /**
      * 1. split the given bundle into two bundles 2. assign ownership of both the bundles to current broker 3. update
-     * policies with newly created bundles into LocalZK 4. disable original bundle and refresh the cache
+     * policies with newly created bundles into LocalZK 4. disable original bundle and refresh the cache.
+     *
+     * It will call splitAndOwnBundleOnceAndRetry to do the real retry work, which will retry "retryTimes".
      *
      * @param bundle
      * @return
      * @throws Exception
      */
-    public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, final boolean unload) throws Exception {
+    public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean unload)
+        throws Exception {
 
         final CompletableFuture<Void> unloadFuture = new CompletableFuture<>();
+        final AtomicInteger counter = new AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
+
+        splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture);
+
+        return unloadFuture;
+    }
+
+    void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
+                                       boolean unload,
+                                       AtomicInteger counter,
+                                       CompletableFuture<Void> unloadFuture) {
+        CompletableFuture<NamespaceBundles> updateFuture = new CompletableFuture<>();
 
         final Pair<NamespaceBundles, List<NamespaceBundle>> splittedBundles = bundleFactory.splitBundles(bundle,
-                2 /* by default split into 2 */);
+            2 /* by default split into 2 */);
+
+        // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper.
         if (splittedBundles != null) {
             checkNotNull(splittedBundles.getLeft());
             checkNotNull(splittedBundles.getRight());
             checkArgument(splittedBundles.getRight().size() == 2, "bundle has to be split in two bundles");
             NamespaceName nsname = bundle.getNamespaceObject();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {},  2 bundles: {}, {}",
+                    nsname.toString(), bundle.getBundleRange(), counter.get(),
+                    splittedBundles != null ? splittedBundles.getRight().get(0).getBundleRange() : "null splittedBundles",
+                    splittedBundles != null ? splittedBundles.getRight().get(1).getBundleRange() : "null splittedBundles");
+            }
             try {
                 // take ownership of newly split bundles
                 for (NamespaceBundle sBundle : splittedBundles.getRight()) {
                     checkNotNull(ownershipCache.tryAcquiringOwnership(sBundle));
                 }
                 updateNamespaceBundles(nsname, splittedBundles.getLeft(),
-                        (rc, path, zkCtx, stat) -> pulsar.getOrderedExecutor().submit(safeRun(() -> {
-                            if (rc == KeeperException.Code.OK.intValue()) {
-                                try {
-                                    // disable old bundle in memory
-                                    getOwnershipCache().updateBundleState(bundle, false);
-                                    // invalidate cache as zookeeper has new split
-                                    // namespace bundle
-                                    bundleFactory.invalidateBundleCache(nsname);
-                                    // update bundled_topic cache for load-report-generation
-                                    pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
-                                    loadManager.get().setLoadReportForceUpdateFlag();
-                                    unloadFuture.complete(null);
-                                } catch (Exception e) {
-                                    String msg1 = format(
-                                            "failed to disable bundle %s under namespace [%s] with error %s",
-                                            nsname.toString(), bundle.toString(), e.getMessage());
-                                    LOG.warn(msg1, e);
-                                    unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg1));
-                                }
-                            } else {
-                                String msg2 = format("failed to update namespace [%s] policies due to %s",
-                                        nsname.toString(),
-                                        KeeperException.create(KeeperException.Code.get(rc)).getMessage());
-                                LOG.warn(msg2);
-                                unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2));
-                            }
-                        })));
+                    (rc, path, zkCtx, stat) ->  {
+                        if (rc == Code.OK.intValue()) {
+                            // invalidate cache as zookeeper has new split
+                            // namespace bundle
+                            bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
+
+                            updateFuture.complete(splittedBundles.getLeft());
+                        } else if (rc == Code.BADVERSION.intValue()) {
+                            KeeperException keeperException = KeeperException.create(KeeperException.Code.get(rc));
+                            String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s " +
+                                    "due to %s, counter: %d",
+                                nsname.toString(), bundle.getBundleRange(),
+                                keeperException.getMessage(), counter.get());
+                            LOG.warn(msg);
+                            updateFuture.completeExceptionally(new ServerMetadataException(keeperException));
+                        } else {
+                            String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s due to %s",
+                                nsname.toString(), bundle.getBundleRange(),
+                                KeeperException.create(KeeperException.Code.get(rc)).getMessage());
+                            LOG.warn(msg);
+                            updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
+                        }
+                    });
             } catch (Exception e) {
-                String msg = format("failed to aquire ownership of split bundle for namespace [%s], %s",
-                        nsname.toString(), e.getMessage());
+                String msg = format("failed to acquire ownership of split bundle for namespace [%s], %s",
+                    nsname.toString(), e.getMessage());
                 LOG.warn(msg, e);
-                unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
+                updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
             }
-
         } else {
             String msg = format("bundle %s not found under namespace", bundle.toString());
-            unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
+            LOG.warn(msg);
+            updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
         }
 
-        return unloadFuture.thenApply(res -> {
-            if (!unload) {
-                return null;
+        // If success updateNamespaceBundles, then do invalidateBundleCache and unload.
+        // Else retry splitAndOwnBundleOnceAndRetry.
+        updateFuture.whenCompleteAsync((r, t)-> {
+            if (t != null) {
+                // retry several times on BadVersion
+                if ((t instanceof ServerMetadataException) && (counter.decrementAndGet() >= 0)) {
+                    pulsar.getOrderedExecutor().submit(
+                        () -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture));
+                } else {
+                    // Retry enough, or meet other exception
+                    String msg2 = format(" %s not success update nsBundles, counter %d, reason %s",
+                        bundle.toString(), counter.get(), t.getMessage());
+                    LOG.warn(msg2);
+                    unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2));
+                }
+                return;
             }
-            // unload new split bundles
-            splittedBundles.getRight().forEach(splitBundle -> {
-                try {
-                    unloadNamespaceBundle(splitBundle);
-                } catch (Exception e) {
-                    LOG.warn("Failed to unload split bundle {}", splitBundle, e);
-                    throw new RuntimeException("Failed to unload split bundle " + splitBundle, e);
+
+            // success updateNamespaceBundles
+            try {
+                // disable old bundle in memory
+                getOwnershipCache().updateBundleState(bundle, false);
+
+                // update bundled_topic cache for load-report-generation
+                pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
+                loadManager.get().setLoadReportForceUpdateFlag();
+
+                if (unload) {
+                    // unload new split bundles
+                    r.getBundles().forEach(splitBundle -> {
+                        try {
+                            unloadNamespaceBundle(splitBundle);
+                        } catch (Exception e) {
+                            LOG.warn("Failed to unload split bundle {}", splitBundle, e);
+                            throw new RuntimeException("Failed to unload split bundle " + splitBundle, e);
+                        }
+                    });
                 }
-            });
-            return null;
-        });
+
+                unloadFuture.complete(null);
+            } catch (Exception e) {
+                String msg1 = format(
+                    "failed to disable bundle %s under namespace [%s] with error %s",
+                    bundle.getNamespaceObject().toString(), bundle.toString(), e.getMessage());
+                LOG.warn(msg1, e);
+                unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg1));
+            }
+            return;
+        }, pulsar.getOrderedExecutor());
     }
 
     /**
-     * update new bundle-range to LocalZk (create a new node if not present)
+     * Update new bundle-range to LocalZk (create a new node if not present).
+     * Update may fail because of concurrent write to Zookeeper.
      *
      * @param nsname
      * @param nsBundles
@@ -643,12 +700,16 @@ private void updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBun
         if (!policies.isPresent()) {
             // if policies is not present into localZk then create new policies
             this.pulsar.getLocalZkCacheService().createPolicies(path, false).get(cacheTimeOutInSec, SECONDS);
-            policies = this.pulsar.getLocalZkCacheService().policiesCache().get(path);
         }
 
-        policies.get().bundles = getBundlesData(nsBundles);
-        this.pulsar.getLocalZkCache().getZooKeeper().setData(path,
-                ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies.get()), -1, callback, null);
+        long version = nsBundles.getVersion();
+        LocalPolicies local = new LocalPolicies();
+        local.bundles = getBundlesData(nsBundles);
+        byte[] data = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(local);
+
+        this.pulsar.getLocalZkCache().getZooKeeper()
+            .setData(path, data, Math.toIntExact(version), callback, null);
+
         // invalidate namespace's local-policies
         this.pulsar.getLocalZkCacheService().policiesCache().invalidate(path);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 577fab32e..f77e41875 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -30,6 +30,7 @@
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -1022,7 +1023,17 @@ public AuthenticationService getAuthenticationService() {
     }
 
     public List<Topic> getAllTopicsFromNamespaceBundle(String namespace, String bundle) {
-        return multiLayerTopicsMap.get(namespace).get(bundle).values();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> map1 = multiLayerTopicsMap.get(namespace);
+        if (map1 == null) {
+            return Collections.emptyList();
+        }
+
+        ConcurrentOpenHashMap<String, Topic> map2 = map1.get(bundle);
+        if (map2 == null) {
+            return Collections.emptyList();
+        }
+
+        return map2.values();
     }
 
     public ZooKeeperDataCache<Map<String, String>> getDynamicConfigurationCache() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index 44dd68670..62670aaf3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -28,6 +28,8 @@
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.SortedSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -78,10 +80,19 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
 
             CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
             // Read the static bundle data from the policies
-            pulsar.getLocalZkCacheService().policiesCache().getAsync(path).thenAccept(policies -> {
+            pulsar.getLocalZkCacheService().policiesCache().getWithStatAsync(path).thenAccept(result -> {
                 // If no policies defined for namespace, assume 1 single bundle
-                BundlesData bundlesData = policies.map(p -> p.bundles).orElse(null);
-                NamespaceBundles namespaceBundles = getBundles(namespace, bundlesData);
+                BundlesData bundlesData = result.map(Entry::getKey).map(p -> p.bundles).orElse(null);
+                NamespaceBundles namespaceBundles = getBundles(
+                    namespace, bundlesData, result.map(Entry::getValue).map(s -> s.getVersion()).orElse(-1));
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("[{}] Get bundles from getLocalZkCacheService: path: {},  bundles: {}, version: {}",
+                        namespace, path,
+                        (bundlesData != null && bundlesData.boundaries != null) ? bundlesData.toString() : "null",
+                        namespaceBundles.getVersion());
+                }
+
                 future.complete(namespaceBundles);
             }).exceptionally(ex -> {
                 future.completeExceptionally(ex);
@@ -157,7 +168,7 @@ public NamespaceBundle getBundle(String namespace, String bundleRange) {
                 (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN);
         return getBundle(NamespaceName.get(namespace), hashRange);
     }
-    
+
     public NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
         return bundlesCache.synchronous().get(fqnn).getFullBundle();
     }
@@ -167,6 +178,10 @@ public long getLongHashCode(String name) {
     }
 
     public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData) {
+        return getBundles(nsname, bundleData, -1);
+    }
+
+    public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData, long version) {
         long[] partitions;
         if (bundleData == null) {
             partitions = new long[] { Long.decode(FIRST_BOUNDARY), Long.decode(LAST_BOUNDARY) };
@@ -176,7 +191,7 @@ public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData)
                 partitions[i] = Long.decode(bundleData.boundaries.get(i));
             }
         }
-        return new NamespaceBundles(nsname, partitions, this);
+        return new NamespaceBundles(nsname, partitions, this, version);
     }
 
     public static BundlesData getBundlesData(NamespaceBundles bundles) throws Exception {
@@ -199,10 +214,8 @@ public static BundlesData getBundlesData(NamespaceBundles bundles) throws Except
      *            split into numBundles
      * @return List of split {@link NamespaceBundle} and {@link NamespaceBundles} that contains final bundles including
      *         split bundles for a given namespace
-     * @throws Exception
      */
-    public Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundle targetBundle, int numBundles)
-            throws Exception {
+    public Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundle targetBundle, int numBundles) {
         checkArgument(canSplitBundle(targetBundle), "%s bundle can't be split further", targetBundle);
         checkNotNull(targetBundle, "can't split null bundle");
         checkNotNull(targetBundle.getNamespaceObject(), "namespace must be present");
@@ -234,7 +247,8 @@ public static BundlesData getBundlesData(NamespaceBundles bundles) throws Except
         }
         partitions[pos] = sourceBundle.partitions[lastIndex];
         if (splitPartition != -1) {
-            NamespaceBundles splittedNsBundles = new NamespaceBundles(nsname, partitions, this);
+            // keep version of sourceBundle
+            NamespaceBundles splittedNsBundles = new NamespaceBundles(nsname, partitions, this, sourceBundle.getVersion());
             List<NamespaceBundle> splittedBundles = splittedNsBundles.getBundles().subList(splitPartition,
                     (splitPartition + numBundles));
             return new ImmutablePair<NamespaceBundles, List<NamespaceBundle>>(splittedNsBundles, splittedBundles);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
index 43fb21914..60b0e7d35 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
@@ -26,9 +26,6 @@
 import java.util.List;
 import java.util.SortedSet;
 
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.naming.NamespaceName;
-
 import com.google.common.base.Objects;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.Lists;
@@ -38,6 +35,8 @@
     private final NamespaceName nsname;
     private final ArrayList<NamespaceBundle> bundles;
     private final NamespaceBundleFactory factory;
+    private final long version;
+
     protected final long[] partitions;
 
     public static final Long FULL_LOWER_BOUND = 0x00000000L;
@@ -49,16 +48,17 @@ public NamespaceBundles(NamespaceName nsname, SortedSet<Long> partitionsSet, Nam
         this(nsname, convertPartitions(partitionsSet), factory);
     }
 
-    public NamespaceBundles(NamespaceName nsname, long[] partitions, NamespaceBundleFactory factory) {
+    public NamespaceBundles(NamespaceName nsname, long[] partitions, NamespaceBundleFactory factory, long version) {
         // check input arguments
         this.nsname = checkNotNull(nsname);
         this.factory = checkNotNull(factory);
+        this.version = version;
         checkArgument(partitions.length > 0, "Can't create bundles w/o partition boundaries");
 
         // calculate bundles based on partition boundaries
         this.bundles = Lists.newArrayList();
         fullBundle = new NamespaceBundle(nsname,
-                Range.range(FULL_LOWER_BOUND, BoundType.CLOSED, FULL_UPPER_BOUND, BoundType.CLOSED), factory);
+            Range.range(FULL_LOWER_BOUND, BoundType.CLOSED, FULL_UPPER_BOUND, BoundType.CLOSED), factory);
 
         if (partitions.length > 0) {
             if (partitions.length == 1) {
@@ -86,6 +86,10 @@ public NamespaceBundles(NamespaceName nsname, long[] partitions, NamespaceBundle
         }
     }
 
+    public NamespaceBundles(NamespaceName nsname, long[] partitions, NamespaceBundleFactory factory) {
+        this(nsname, partitions, factory, -1);
+    }
+
     public NamespaceBundle findBundle(TopicName topicName) {
         checkArgument(this.nsname.equals(topicName.getNamespaceObject()));
         long hashCode = factory.getLongHashCode(topicName.toString());
@@ -140,4 +144,8 @@ public boolean equals(Object obj) {
         }
         return false;
     }
+
+    public long getVersion() {
+        return version;
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index fe89a327e..a49faa288 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -28,18 +28,23 @@
 import java.lang.reflect.Field;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -106,6 +111,7 @@
 import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
 
+@Slf4j
 public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(AdminApiTest.class);
@@ -932,6 +938,115 @@ public void testNamespaceSplitBundle() throws Exception {
         producer.close();
     }
 
+    @Test
+    public void testNamespaceSplitBundleConcurrent() throws Exception {
+        // Force to create a topic
+        final String namespace = "prop-xyz/use/ns1";
+        final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        producer.send("message".getBytes());
+        publishMessagesOnPersistentTopic(topicName, 0);
+        assertEquals(admin.persistentTopics().getList(namespace), Lists.newArrayList(topicName));
+
+        try {
+            admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false);
+        } catch (Exception e) {
+            fail("split bundle shouldn't have thrown exception");
+        }
+
+        // bundle-factory cache must have updated split bundles
+        NamespaceBundles bundles = bundleFactory.getBundles(NamespaceName.get(namespace));
+        String[] splitRange = {namespace + "/0x00000000_0x7fffffff", namespace + "/0x7fffffff_0xffffffff"};
+        for (int i = 0; i < bundles.getBundles().size(); i++) {
+            assertEquals(bundles.getBundles().get(i).toString(), splitRange[i]);
+        }
+
+        ExecutorService executorService = Executors.newCachedThreadPool();
+
+
+        try {
+            executorService.invokeAll(
+                Arrays.asList(
+                    () ->
+                    {
+                        log.info("split 2 bundles at the same time. spilt: 0x00000000_0x7fffffff ");
+                        admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false);
+                        return null;
+                    },
+                    () ->
+                    {
+                        log.info("split 2 bundles at the same time. spilt: 0x7fffffff_0xffffffff ");
+                        admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false);
+                        return null;
+                    }
+                )
+            );
+        } catch (Exception e) {
+            fail("split bundle shouldn't have thrown exception");
+        }
+
+        String[] splitRange4 = {
+            namespace + "/0x00000000_0x3fffffff",
+            namespace + "/0x3fffffff_0x7fffffff",
+            namespace + "/0x7fffffff_0xbfffffff",
+            namespace + "/0xbfffffff_0xffffffff"};
+        bundles = bundleFactory.getBundles(NamespaceName.get(namespace));
+        assertEquals(bundles.getBundles().size(), 4);
+        for (int i = 0; i < bundles.getBundles().size(); i++) {
+            assertEquals(bundles.getBundles().get(i).toString(), splitRange4[i]);
+        }
+
+        try {
+            executorService.invokeAll(
+                Arrays.asList(
+                    () ->
+                    {
+                        log.info("split 4 bundles at the same time. spilt: 0x00000000_0x3fffffff ");
+                        admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false);
+                        return null;
+                    },
+                    () ->
+                    {
+                        log.info("split 4 bundles at the same time. spilt: 0x3fffffff_0x7fffffff ");
+                        admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false);
+                        return null;
+                    },
+                    () ->
+                    {
+                        log.info("split 4 bundles at the same time. spilt: 0x7fffffff_0xbfffffff ");
+                        admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false);
+                        return null;
+                    },
+                    () ->
+                    {
+                        log.info("split 4 bundles at the same time. spilt: 0xbfffffff_0xffffffff ");
+                        admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false);
+                        return null;
+                    }
+                )
+            );
+        } catch (Exception e) {
+            fail("split bundle shouldn't have thrown exception");
+        }
+
+        String[] splitRange8 = {
+            namespace + "/0x00000000_0x1fffffff",
+            namespace + "/0x1fffffff_0x3fffffff",
+            namespace + "/0x3fffffff_0x5fffffff",
+            namespace + "/0x5fffffff_0x7fffffff",
+            namespace + "/0x7fffffff_0x9fffffff",
+            namespace + "/0x9fffffff_0xbfffffff",
+            namespace + "/0xbfffffff_0xdfffffff",
+            namespace + "/0xdfffffff_0xffffffff"};
+        bundles = bundleFactory.getBundles(NamespaceName.get(namespace));
+        assertEquals(bundles.getBundles().size(), 8);
+        for (int i = 0; i < bundles.getBundles().size(); i++) {
+            assertEquals(bundles.getBundles().get(i).toString(), splitRange8[i]);
+        }
+
+        producer.close();
+    }
+
     @Test
     public void testNamespaceUnloadBundle() throws Exception {
         assertEquals(admin.persistentTopics().getList("prop-xyz/use/ns1"), Lists.newArrayList());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 066693e34..01e388a33 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -196,13 +196,9 @@ public void testSplitMapWithRefreshedStatMap() throws Exception {
             fail("split bundle faild", e);
         }
 
-        try {
-            // old bundle should be removed from status-map
-            list = this.pulsar.getBrokerService().getAllTopicsFromNamespaceBundle(nspace, originalBundle.toString());
-            fail();
-        } catch (NullPointerException ne) {
-            // OK
-        }
+        // old bundle should be removed from status-map
+        list = this.pulsar.getBrokerService().getAllTopicsFromNamespaceBundle(nspace, originalBundle.toString());
+        assertTrue(list.isEmpty());
 
         // status-map should be updated with new split bundles
         NamespaceBundle splitBundle = pulsar.getNamespaceService().getBundle(topicName);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
index 5c69d4331..defed3ae9 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
@@ -73,6 +73,14 @@ public ZooKeeperDataCache(final ZooKeeperCache cache) {
         return future;
     }
 
+    public CompletableFuture<Optional<Entry<T, Stat>>> getWithStatAsync(String path) {
+        return cache.getDataAsync(path, this, this).whenComplete((entry, ex) -> {
+            if (ex != null) {
+                cache.asyncInvalidate(path);
+            }
+        });
+    }
+
     /**
      * Return an item from the cache
      *


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services