You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ho...@apache.org on 2022/10/09 13:24:34 UTC
[pulsar] branch master updated: [fix][broker] Update new bundle-range to policies after bundle split (#17797)
This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7df4ee99109 [fix][broker] Update new bundle-range to policies after bundle split (#17797)
7df4ee99109 is described below
commit 7df4ee99109cfcbe0ff361bba6a68f8dc02a0edc
Author: LinChen <15...@qq.com>
AuthorDate: Sun Oct 9 21:24:24 2022 +0800
[fix][broker] Update new bundle-range to policies after bundle split (#17797)
Co-authored-by: leolinchen <le...@tencent.com>
---
.../pulsar/broker/namespace/NamespaceService.java | 41 +++++++++++++++++++---
.../broker/namespace/NamespaceServiceTest.java | 11 ++++--
2 files changed, 44 insertions(+), 8 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 5c1cb283d69..acafe28f1e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -89,6 +89,7 @@ import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
+import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.FutureUtil;
@@ -880,11 +881,12 @@ public class NamespaceService implements AutoCloseable {
for (NamespaceBundle sBundle : splittedBundles.getRight()) {
Objects.requireNonNull(ownershipCache.tryAcquiringOwnership(sBundle));
}
- updateNamespaceBundles(nsname, splittedBundles.getLeft())
- .thenRun(() -> {
- bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
- updateFuture.complete(splittedBundles.getRight());
- }).exceptionally(ex1 -> {
+ updateNamespaceBundles(nsname, splittedBundles.getLeft()).thenCompose(__ -> {
+ return updateNamespaceBundlesForPolicies(nsname, splittedBundles.getLeft());
+ }).thenRun(() -> {
+ bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
+ updateFuture.complete(splittedBundles.getRight());
+ }).exceptionally(ex1 -> {
String msg = format("failed to update namespace policies [%s], "
+ "NamespaceBundle: %s due to %s",
nsname.toString(), bundle.getBundleRange(), ex1.getMessage());
@@ -959,6 +961,35 @@ public class NamespaceService implements AutoCloseable {
});
}
+ /**
+ * Update new bundle-range to admin/policies/namespace.
+ * Update may fail because of concurrent write to Zookeeper.
+ *
+ * @param nsname
+ * @param nsBundles
+ * @throws Exception
+ */
+ private CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName nsname,
+ NamespaceBundles nsBundles) {
+ Objects.requireNonNull(nsname);
+ Objects.requireNonNull(nsBundles);
+
+ return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(nsname).thenCompose(policies -> {
+ if (policies.isPresent()) {
+ return pulsar.getPulsarResources().getNamespaceResources().setPoliciesAsync(nsname, oldPolicies -> {
+ oldPolicies.bundles = nsBundles.getBundlesData();
+ return oldPolicies;
+ });
+ } else {
+ LOG.error("Policies of namespace {} is not exist!", nsname);
+ Policies newPolicies = new Policies();
+ newPolicies.bundles = nsBundles.getBundlesData();
+ return pulsar.getPulsarResources().getNamespaceResources().createPoliciesAsync(nsname, newPolicies);
+ }
+ });
+ }
+
+
/**
* Update new bundle-range to LocalZk (create a new node if not present).
* Update may fail because of concurrent write to Zookeeper.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index ca8408c468d..eadde5e3fc4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -155,11 +155,16 @@ public class NamespaceServiceTest extends BrokerTestBase {
splitBundleSet.removeAll(bundleList);
assertTrue(splitBundleSet.isEmpty());
- // (2) validate LocalZookeeper policies updated with newly created split
+ // (2) validate localPolicies and policies updated with newly created split
// bundles
- LocalPolicies policies = pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(nsname).get();
- NamespaceBundles localZkBundles = bundleFactory.getBundles(nsname, policies.bundles);
+ LocalPolicies localPolicies = pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(nsname).get();
+ NamespaceBundles localZkBundles = bundleFactory.getBundles(nsname, localPolicies.bundles);
assertEquals(localZkBundles, updatedNsBundles);
+ log.info("LocalPolicies: {}", localPolicies);
+
+ Policies policies = pulsar.getPulsarResources().getNamespaceResources().getPolicies(nsname).get();
+ NamespaceBundles zkBundles = bundleFactory.getBundles(nsname, policies.bundles);
+ assertEquals(zkBundles, updatedNsBundles);
log.info("Policies: {}", policies);
// (3) validate ownership of new split bundles by local owner