You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ho...@apache.org on 2022/10/09 13:24:34 UTC

[pulsar] branch master updated: [fix][broker] Update new bundle-range to policies after bundle split (#17797)

This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7df4ee99109 [fix][broker] Update new bundle-range to  policies after bundle split (#17797)
7df4ee99109 is described below

commit 7df4ee99109cfcbe0ff361bba6a68f8dc02a0edc
Author: LinChen <15...@qq.com>
AuthorDate: Sun Oct 9 21:24:24 2022 +0800

    [fix][broker] Update new bundle-range to  policies after bundle split (#17797)
    
    Co-authored-by: leolinchen <le...@tencent.com>
---
 .../pulsar/broker/namespace/NamespaceService.java  | 41 +++++++++++++++++++---
 .../broker/namespace/NamespaceServiceTest.java     | 11 ++++--
 2 files changed, 44 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 5c1cb283d69..acafe28f1e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -89,6 +89,7 @@ import org.apache.pulsar.common.policies.data.BrokerAssignment;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -880,11 +881,12 @@ public class NamespaceService implements AutoCloseable {
                                     for (NamespaceBundle sBundle : splittedBundles.getRight()) {
                                         Objects.requireNonNull(ownershipCache.tryAcquiringOwnership(sBundle));
                                     }
-                                    updateNamespaceBundles(nsname, splittedBundles.getLeft())
-                                            .thenRun(() -> {
-                                                bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
-                                                updateFuture.complete(splittedBundles.getRight());
-                                            }).exceptionally(ex1 -> {
+                                    updateNamespaceBundles(nsname, splittedBundles.getLeft()).thenCompose(__ -> {
+                                        return updateNamespaceBundlesForPolicies(nsname, splittedBundles.getLeft());
+                                    }).thenRun(() -> {
+                                        bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
+                                        updateFuture.complete(splittedBundles.getRight());
+                                    }).exceptionally(ex1 -> {
                                         String msg = format("failed to update namespace policies [%s], "
                                                         + "NamespaceBundle: %s due to %s",
                                                 nsname.toString(), bundle.getBundleRange(), ex1.getMessage());
@@ -959,6 +961,35 @@ public class NamespaceService implements AutoCloseable {
         });
     }
 
+    /**
+     * Update new bundle-range to admin/policies/namespace.
+     * Update may fail because of concurrent write to Zookeeper.
+     *
+     * @param nsname
+     * @param nsBundles
+     * @throws Exception
+     */
+    private CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName nsname,
+                                                                      NamespaceBundles nsBundles) {
+        Objects.requireNonNull(nsname);
+        Objects.requireNonNull(nsBundles);
+
+        return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(nsname).thenCompose(policies -> {
+            if (policies.isPresent()) {
+                return pulsar.getPulsarResources().getNamespaceResources().setPoliciesAsync(nsname, oldPolicies -> {
+                    oldPolicies.bundles = nsBundles.getBundlesData();
+                    return oldPolicies;
+                });
+            } else {
+                LOG.error("Policies of namespace {} is not exist!", nsname);
+                Policies newPolicies = new Policies();
+                newPolicies.bundles = nsBundles.getBundlesData();
+                return pulsar.getPulsarResources().getNamespaceResources().createPoliciesAsync(nsname, newPolicies);
+            }
+        });
+    }
+
+
     /**
      * Update new bundle-range to LocalZk (create a new node if not present).
      * Update may fail because of concurrent write to Zookeeper.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index ca8408c468d..eadde5e3fc4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -155,11 +155,16 @@ public class NamespaceServiceTest extends BrokerTestBase {
         splitBundleSet.removeAll(bundleList);
         assertTrue(splitBundleSet.isEmpty());
 
-        // (2) validate LocalZookeeper policies updated with newly created split
+        // (2) validate localPolicies and policies  updated with newly created split
         // bundles
-        LocalPolicies policies = pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(nsname).get();
-        NamespaceBundles localZkBundles = bundleFactory.getBundles(nsname, policies.bundles);
+        LocalPolicies localPolicies = pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(nsname).get();
+        NamespaceBundles localZkBundles = bundleFactory.getBundles(nsname, localPolicies.bundles);
         assertEquals(localZkBundles, updatedNsBundles);
+        log.info("LocalPolicies: {}", localPolicies);
+
+        Policies policies = pulsar.getPulsarResources().getNamespaceResources().getPolicies(nsname).get();
+        NamespaceBundles zkBundles = bundleFactory.getBundles(nsname, policies.bundles);
+        assertEquals(zkBundles, updatedNsBundles);
         log.info("Policies: {}", policies);
 
         // (3) validate ownership of new split bundles by local owner