You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/17 03:12:33 UTC
[pulsar] branch branch-2.7 updated: When the Replicator is enabled,
no managedLedger is created when updating the number of partitions
(#10910)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 7a6c179 When the Replicator is enabled, no managedLedger is created when updating the number of partitions (#10910)
7a6c179 is described below
commit 7a6c17971fc8d04578fe3dfca49e1003a2c4eda2
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Jun 17 10:20:45 2021 +0800
When the Replicator is enabled, no managedLedger is created when updating the number of partitions (#10910)
Fixes #10673
When updating the number of partitions, we need to update the data in two places in zk:
```
/admin/partitioned-topics
/managed-ledgers/
```
Now we only update the number of partitions in `/admin/partitioned-topics`, so if we do not create a Producer or Consumer, the data obtained in another cluster will be incorrect
1)Try to create managedLedger when updating the number of partitions
2)Ensure that the number of partitions in `/admin/partitioned-topics` is updated every time
(cherry picked from commit 202da117b529b24bdf9c994750266dac597294a8)
---
.../broker/admin/impl/PersistentTopicsBase.java | 3 +-
.../pulsar/broker/service/ReplicatorTest.java | 74 ++++++++++++++++------
2 files changed, 58 insertions(+), 19 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d842a77..c89e780 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -507,6 +507,7 @@ public class PersistentTopicsBase extends AdminResource {
throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
}
try {
+ tryCreatePartitionsAsync(numPartitions).get();
createSubscriptions(topicName, numPartitions).get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
@@ -520,7 +521,7 @@ public class PersistentTopicsBase extends AdminResource {
if (!updateLocalTopicOnly) {
CompletableFuture<Void> updatePartition = new CompletableFuture<>();
final String path = ZkAdminPaths.partitionedTopicPath(topicName);
- updatePartitionInOtherCluster(numPartitions, clusters).thenAccept((res) -> {
+ updatePartitionInOtherCluster(numPartitions, clusters).thenRun(() -> {
try {
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
globalZk().setData(path, data, -1, (rc, path1, ctx, stat) -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 41d085b..fc58dac 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -35,6 +36,7 @@ import java.lang.reflect.Method;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -162,12 +164,12 @@ public class ReplicatorTest extends ReplicatorTestBase {
ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();
- Assert.assertNotNull(replicationClients1.get("r2"));
- Assert.assertNotNull(replicationClients1.get("r3"));
- Assert.assertNotNull(replicationClients2.get("r1"));
- Assert.assertNotNull(replicationClients2.get("r3"));
- Assert.assertNotNull(replicationClients3.get("r1"));
- Assert.assertNotNull(replicationClients3.get("r2"));
+ assertNotNull(replicationClients1.get("r2"));
+ assertNotNull(replicationClients1.get("r3"));
+ assertNotNull(replicationClients2.get("r1"));
+ assertNotNull(replicationClients2.get("r3"));
+ assertNotNull(replicationClients3.get("r1"));
+ assertNotNull(replicationClients3.get("r2"));
// Case 1: Update the global namespace replication configuration to only contains the local cluster itself
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));
@@ -176,12 +178,12 @@ public class ReplicatorTest extends ReplicatorTestBase {
Thread.sleep(1000L);
// Make sure that the internal replicators map still contains remote cluster info
- Assert.assertNotNull(replicationClients1.get("r2"));
- Assert.assertNotNull(replicationClients1.get("r3"));
- Assert.assertNotNull(replicationClients2.get("r1"));
- Assert.assertNotNull(replicationClients2.get("r3"));
- Assert.assertNotNull(replicationClients3.get("r1"));
- Assert.assertNotNull(replicationClients3.get("r2"));
+ assertNotNull(replicationClients1.get("r2"));
+ assertNotNull(replicationClients1.get("r3"));
+ assertNotNull(replicationClients2.get("r1"));
+ assertNotNull(replicationClients2.get("r3"));
+ assertNotNull(replicationClients3.get("r1"));
+ assertNotNull(replicationClients3.get("r2"));
// Case 2: Update the configuration back
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
@@ -190,12 +192,12 @@ public class ReplicatorTest extends ReplicatorTestBase {
Thread.sleep(1000L);
// Make sure that the internal replicators map still contains remote cluster info
- Assert.assertNotNull(replicationClients1.get("r2"));
- Assert.assertNotNull(replicationClients1.get("r3"));
- Assert.assertNotNull(replicationClients2.get("r1"));
- Assert.assertNotNull(replicationClients2.get("r3"));
- Assert.assertNotNull(replicationClients3.get("r1"));
- Assert.assertNotNull(replicationClients3.get("r2"));
+ assertNotNull(replicationClients1.get("r2"));
+ assertNotNull(replicationClients1.get("r3"));
+ assertNotNull(replicationClients2.get("r1"));
+ assertNotNull(replicationClients2.get("r3"));
+ assertNotNull(replicationClients3.get("r1"));
+ assertNotNull(replicationClients3.get("r2"));
// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
}
@@ -793,6 +795,42 @@ public class ReplicatorTest extends ReplicatorTestBase {
reader2.closeAsync().get();
}
+ @Test
+ public void testReplicatorWithPartitionedTopic() throws Exception {
+ final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+ final String persistentTopicName = "persistent://" + namespace + "/partTopic" + UUID.randomUUID();
+
+ admin1.namespaces().createNamespace(namespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
+ // Create partitioned-topic from R1
+ admin1.topics().createPartitionedTopic(persistentTopicName, 3);
+ // List partitioned topics from R2
+ Awaitility.await().untilAsserted(() -> assertNotNull(admin2.topics().getPartitionedTopicList(namespace)));
+ Awaitility.await().untilAsserted(() -> assertEquals(
+ admin2.topics().getPartitionedTopicList(namespace).get(0), persistentTopicName));
+ assertEquals(admin1.topics().getList(namespace).size(), 3);
+ // List partitioned topics from R3
+ Awaitility.await().untilAsserted(() -> assertNotNull(admin3.topics().getPartitionedTopicList(namespace)));
+ Awaitility.await().untilAsserted(() -> assertEquals(
+ admin3.topics().getPartitionedTopicList(namespace).get(0), persistentTopicName));
+ // Update partitioned topic from R2
+ admin2.topics().updatePartitionedTopic(persistentTopicName, 5);
+ assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 5);
+ assertEquals(admin2.topics().getList(namespace).size(), 5);
+ // Update partitioned topic from R3
+ admin3.topics().updatePartitionedTopic(persistentTopicName, 5);
+ assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 5);
+ assertEquals(admin3.topics().getList(namespace).size(), 5);
+ // Update partitioned topic from R1
+ admin1.topics().updatePartitionedTopic(persistentTopicName, 6);
+ assertEquals(admin1.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
+ assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
+ assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
+ assertEquals(admin1.topics().getList(namespace).size(), 6);
+ assertEquals(admin2.topics().getList(namespace).size(), 6);
+ assertEquals(admin3.topics().getList(namespace).size(), 6);
+ }
+
/**
* It verifies that broker should not start replicator for partitioned-topic (topic without -partition postfix)
*