You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/17 03:12:33 UTC

[pulsar] branch branch-2.7 updated: When the Replicator is enabled, no managedLedger is created when updating the number of partitions (#10910)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 7a6c179  When the Replicator is enabled, no managedLedger is created when updating the number of partitions (#10910)
7a6c179 is described below

commit 7a6c17971fc8d04578fe3dfca49e1003a2c4eda2
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Jun 17 10:20:45 2021 +0800

    When the Replicator is enabled, no managedLedger is created when updating the number of partitions (#10910)
    
    Fixes #10673
    
    When updating the number of partitions, we need to update the data in two places in zk:
    ```
    /admin/partitioned-topics
    /managed-ledgers/
    ```
    
    Now we only update the number of partitions in `/admin/partitioned-topics`, so if we do not create a Producer or Consumer, the data obtained in another cluster will be incorrect
    
    1)Try to create managedLedger when updating the number of partitions
    2)Ensure that the number of partitions in `/admin/partitioned-topics` is updated every time
    
    (cherry picked from commit 202da117b529b24bdf9c994750266dac597294a8)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  3 +-
 .../pulsar/broker/service/ReplicatorTest.java      | 74 ++++++++++++++++------
 2 files changed, 58 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d842a77..c89e780 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -507,6 +507,7 @@ public class PersistentTopicsBase extends AdminResource {
                 throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
             }
             try {
+                tryCreatePartitionsAsync(numPartitions).get();
                 createSubscriptions(topicName, numPartitions).get();
             } catch (Exception e) {
                 if (e.getCause() instanceof RestException) {
@@ -520,7 +521,7 @@ public class PersistentTopicsBase extends AdminResource {
             if (!updateLocalTopicOnly) {
                 CompletableFuture<Void> updatePartition = new CompletableFuture<>();
                 final String path = ZkAdminPaths.partitionedTopicPath(topicName);
-                updatePartitionInOtherCluster(numPartitions, clusters).thenAccept((res) -> {
+                updatePartitionInOtherCluster(numPartitions, clusters).thenRun(() -> {
                     try {
                         byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
                         globalZk().setData(path, data, -1, (rc, path1, ctx, stat) -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 41d085b..fc58dac 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -35,6 +36,7 @@ import java.lang.reflect.Method;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -162,12 +164,12 @@ public class ReplicatorTest extends ReplicatorTestBase {
         ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
         ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();
 
-        Assert.assertNotNull(replicationClients1.get("r2"));
-        Assert.assertNotNull(replicationClients1.get("r3"));
-        Assert.assertNotNull(replicationClients2.get("r1"));
-        Assert.assertNotNull(replicationClients2.get("r3"));
-        Assert.assertNotNull(replicationClients3.get("r1"));
-        Assert.assertNotNull(replicationClients3.get("r2"));
+        assertNotNull(replicationClients1.get("r2"));
+        assertNotNull(replicationClients1.get("r3"));
+        assertNotNull(replicationClients2.get("r1"));
+        assertNotNull(replicationClients2.get("r3"));
+        assertNotNull(replicationClients3.get("r1"));
+        assertNotNull(replicationClients3.get("r2"));
 
         // Case 1: Update the global namespace replication configuration to only contains the local cluster itself
         admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));
@@ -176,12 +178,12 @@ public class ReplicatorTest extends ReplicatorTestBase {
         Thread.sleep(1000L);
 
         // Make sure that the internal replicators map still contains remote cluster info
-        Assert.assertNotNull(replicationClients1.get("r2"));
-        Assert.assertNotNull(replicationClients1.get("r3"));
-        Assert.assertNotNull(replicationClients2.get("r1"));
-        Assert.assertNotNull(replicationClients2.get("r3"));
-        Assert.assertNotNull(replicationClients3.get("r1"));
-        Assert.assertNotNull(replicationClients3.get("r2"));
+        assertNotNull(replicationClients1.get("r2"));
+        assertNotNull(replicationClients1.get("r3"));
+        assertNotNull(replicationClients2.get("r1"));
+        assertNotNull(replicationClients2.get("r3"));
+        assertNotNull(replicationClients3.get("r1"));
+        assertNotNull(replicationClients3.get("r2"));
 
         // Case 2: Update the configuration back
         admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
@@ -190,12 +192,12 @@ public class ReplicatorTest extends ReplicatorTestBase {
         Thread.sleep(1000L);
 
         // Make sure that the internal replicators map still contains remote cluster info
-        Assert.assertNotNull(replicationClients1.get("r2"));
-        Assert.assertNotNull(replicationClients1.get("r3"));
-        Assert.assertNotNull(replicationClients2.get("r1"));
-        Assert.assertNotNull(replicationClients2.get("r3"));
-        Assert.assertNotNull(replicationClients3.get("r1"));
-        Assert.assertNotNull(replicationClients3.get("r2"));
+        assertNotNull(replicationClients1.get("r2"));
+        assertNotNull(replicationClients1.get("r3"));
+        assertNotNull(replicationClients2.get("r1"));
+        assertNotNull(replicationClients2.get("r3"));
+        assertNotNull(replicationClients3.get("r1"));
+        assertNotNull(replicationClients3.get("r2"));
 
         // Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
     }
@@ -793,6 +795,42 @@ public class ReplicatorTest extends ReplicatorTestBase {
         reader2.closeAsync().get();
     }
 
+    @Test
+    public void testReplicatorWithPartitionedTopic() throws Exception {
+        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+        final String persistentTopicName = "persistent://" + namespace + "/partTopic" + UUID.randomUUID();
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
+        // Create partitioned-topic from R1
+        admin1.topics().createPartitionedTopic(persistentTopicName, 3);
+        // List partitioned topics from R2
+        Awaitility.await().untilAsserted(() -> assertNotNull(admin2.topics().getPartitionedTopicList(namespace)));
+        Awaitility.await().untilAsserted(() -> assertEquals(
+                admin2.topics().getPartitionedTopicList(namespace).get(0), persistentTopicName));
+        assertEquals(admin1.topics().getList(namespace).size(), 3);
+        // List partitioned topics from R3
+        Awaitility.await().untilAsserted(() -> assertNotNull(admin3.topics().getPartitionedTopicList(namespace)));
+        Awaitility.await().untilAsserted(() -> assertEquals(
+                admin3.topics().getPartitionedTopicList(namespace).get(0), persistentTopicName));
+        // Update partitioned topic from R2
+        admin2.topics().updatePartitionedTopic(persistentTopicName, 5);
+        assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 5);
+        assertEquals(admin2.topics().getList(namespace).size(), 5);
+        // Update partitioned topic from R3
+        admin3.topics().updatePartitionedTopic(persistentTopicName, 5);
+        assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 5);
+        assertEquals(admin3.topics().getList(namespace).size(), 5);
+        // Update partitioned topic from R1
+        admin1.topics().updatePartitionedTopic(persistentTopicName, 6);
+        assertEquals(admin1.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
+        assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
+        assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
+        assertEquals(admin1.topics().getList(namespace).size(), 6);
+        assertEquals(admin2.topics().getList(namespace).size(), 6);
+        assertEquals(admin3.topics().getList(namespace).size(), 6);
+    }
+
     /**
      * It verifies that broker should not start replicator for partitioned-topic (topic without -partition postfix)
      *