You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/25 04:01:08 UTC
[pulsar] 05/07: When the Replicator is enabled,
no managedLedger is created when updating the number of partitions
(#10910)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ec86a9de598853fb38c2122730f035e8345b6165
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Jun 17 10:20:45 2021 +0800
When the Replicator is enabled, no managedLedger is created when updating the number of partitions (#10910)
Fixes #10673
### Motivation
When updating the number of partitions, we need to update the data in two places in zk:
```
/admin/partitioned-topics
/managed-ledgers/
```
Now we only update the number of partitions in `/admin/partitioned-topics`, so if we do not create a Producer or Consumer, the data obtained in another cluster will be incorrect
### Modifications
1)Try to create managedLedger when updating the number of partitions
2)Ensure that the number of partitions in `/admin/partitioned-topics` is updated every time
(cherry picked from commit 202da117b529b24bdf9c994750266dac597294a8)
---
.../broker/admin/impl/PersistentTopicsBase.java | 3 +-
.../pulsar/broker/service/ReplicatorTest.java | 37 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8bf5a7c..97f4a8d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -422,6 +422,7 @@ public class PersistentTopicsBase extends AdminResource {
throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
}
try {
+ tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
createSubscriptions(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
@@ -435,7 +436,7 @@ public class PersistentTopicsBase extends AdminResource {
if (!updateLocalTopicOnly) {
CompletableFuture<Void> updatePartition = new CompletableFuture<>();
final String path = ZkAdminPaths.partitionedTopicPath(topicName);
- updatePartitionInOtherCluster(numPartitions, clusters).thenAccept((res) -> {
+ updatePartitionInOtherCluster(numPartitions, clusters).thenRun(() -> {
try {
namespaceResources().getPartitionedTopicResources().setAsync(path, (p) -> {
return new PartitionedTopicMetadata(numPartitions);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index ead91a5..e5219d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -36,6 +36,7 @@ import java.lang.reflect.Method;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -803,6 +804,42 @@ public class ReplicatorTest extends ReplicatorTestBase {
reader2.closeAsync().get();
}
+ @Test
+ public void testReplicatorWithPartitionedTopic() throws Exception {
+ final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+ final String persistentTopicName = "persistent://" + namespace + "/partTopic" + UUID.randomUUID();
+
+ admin1.namespaces().createNamespace(namespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
+ // Create partitioned-topic from R1
+ admin1.topics().createPartitionedTopic(persistentTopicName, 3);
+ // List partitioned topics from R2
+ Awaitility.await().untilAsserted(() -> assertNotNull(admin2.topics().getPartitionedTopicList(namespace)));
+ Awaitility.await().untilAsserted(() -> assertEquals(
+ admin2.topics().getPartitionedTopicList(namespace).get(0), persistentTopicName));
+ assertEquals(admin1.topics().getList(namespace).size(), 3);
+ // List partitioned topics from R3
+ Awaitility.await().untilAsserted(() -> assertNotNull(admin3.topics().getPartitionedTopicList(namespace)));
+ Awaitility.await().untilAsserted(() -> assertEquals(
+ admin3.topics().getPartitionedTopicList(namespace).get(0), persistentTopicName));
+ // Update partitioned topic from R2
+ admin2.topics().updatePartitionedTopic(persistentTopicName, 5);
+ assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 5);
+ assertEquals(admin2.topics().getList(namespace).size(), 5);
+ // Update partitioned topic from R3
+ admin3.topics().updatePartitionedTopic(persistentTopicName, 5);
+ assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 5);
+ assertEquals(admin3.topics().getList(namespace).size(), 5);
+ // Update partitioned topic from R1
+ admin1.topics().updatePartitionedTopic(persistentTopicName, 6);
+ assertEquals(admin1.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
+ assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
+ assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
+ assertEquals(admin1.topics().getList(namespace).size(), 6);
+ assertEquals(admin2.topics().getList(namespace).size(), 6);
+ assertEquals(admin3.topics().getList(namespace).size(), 6);
+ }
+
/**
* It verifies that broker should not start replicator for partitioned-topic (topic without -partition postfix)
*