You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/25 04:01:08 UTC

[pulsar] 05/07: When the Replicator is enabled, no managedLedger is created when updating the number of partitions (#10910)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ec86a9de598853fb38c2122730f035e8345b6165
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Jun 17 10:20:45 2021 +0800

    When the Replicator is enabled, no managedLedger is created when updating the number of partitions (#10910)
    
    Fixes #10673
    
    ### Motivation
    When updating the number of partitions, we need to update the data in two places in zk:
    ```
    /admin/partitioned-topics
    /managed-ledgers/
    ```
    
    Now we only update the number of partitions in `/admin/partitioned-topics`, so if we do not create a Producer or Consumer, the data obtained in another cluster will be incorrect
    
    ### Modifications
    1)Try to create managedLedger when updating the number of partitions
    2)Ensure that the number of partitions in `/admin/partitioned-topics` is updated every time
    
    (cherry picked from commit 202da117b529b24bdf9c994750266dac597294a8)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  3 +-
 .../pulsar/broker/service/ReplicatorTest.java      | 37 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8bf5a7c..97f4a8d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -422,6 +422,7 @@ public class PersistentTopicsBase extends AdminResource {
                 throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
             }
             try {
+                tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
                 createSubscriptions(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
             } catch (Exception e) {
                 if (e.getCause() instanceof RestException) {
@@ -435,7 +436,7 @@ public class PersistentTopicsBase extends AdminResource {
             if (!updateLocalTopicOnly) {
                 CompletableFuture<Void> updatePartition = new CompletableFuture<>();
                 final String path = ZkAdminPaths.partitionedTopicPath(topicName);
-                updatePartitionInOtherCluster(numPartitions, clusters).thenAccept((res) -> {
+                updatePartitionInOtherCluster(numPartitions, clusters).thenRun(() -> {
                     try {
                         namespaceResources().getPartitionedTopicResources().setAsync(path, (p) -> {
                             return new PartitionedTopicMetadata(numPartitions);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index ead91a5..e5219d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -36,6 +36,7 @@ import java.lang.reflect.Method;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -803,6 +804,42 @@ public class ReplicatorTest extends ReplicatorTestBase {
         reader2.closeAsync().get();
     }
 
+    @Test
+    public void testReplicatorWithPartitionedTopic() throws Exception {
+        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+        final String persistentTopicName = "persistent://" + namespace + "/partTopic" + UUID.randomUUID();
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
+        // Create partitioned-topic from R1
+        admin1.topics().createPartitionedTopic(persistentTopicName, 3);
+        // List partitioned topics from R2
+        Awaitility.await().untilAsserted(() -> assertNotNull(admin2.topics().getPartitionedTopicList(namespace)));
+        Awaitility.await().untilAsserted(() -> assertEquals(
+                admin2.topics().getPartitionedTopicList(namespace).get(0), persistentTopicName));
+        assertEquals(admin1.topics().getList(namespace).size(), 3);
+        // List partitioned topics from R3
+        Awaitility.await().untilAsserted(() -> assertNotNull(admin3.topics().getPartitionedTopicList(namespace)));
+        Awaitility.await().untilAsserted(() -> assertEquals(
+                admin3.topics().getPartitionedTopicList(namespace).get(0), persistentTopicName));
+        // Update partitioned topic from R2
+        admin2.topics().updatePartitionedTopic(persistentTopicName, 5);
+        assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 5);
+        assertEquals(admin2.topics().getList(namespace).size(), 5);
+        // Update partitioned topic from R3
+        admin3.topics().updatePartitionedTopic(persistentTopicName, 5);
+        assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 5);
+        assertEquals(admin3.topics().getList(namespace).size(), 5);
+        // Update partitioned topic from R1
+        admin1.topics().updatePartitionedTopic(persistentTopicName, 6);
+        assertEquals(admin1.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
+        assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
+        assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
+        assertEquals(admin1.topics().getList(namespace).size(), 6);
+        assertEquals(admin2.topics().getList(namespace).size(), 6);
+        assertEquals(admin3.topics().getList(namespace).size(), 6);
+    }
+
     /**
      * It verifies that broker should not start replicator for partitioned-topic (topic without -partition postfix)
      *