You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/05 12:18:25 UTC

[pulsar] 02/03: Fix the partition number not equals expected error (#9446)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1f17b1a08482cda96e944a0331277c7429a276bd
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Feb 5 14:46:47 2021 +0800

    Fix the partition number not equals expected error (#9446)
    
    Fixes #8000
    
    ### Motivation
    
    Fix the partition number not equals expected error
    
    ### Verifying this change
    
    New tests added, without this fix, you can see errors like
    `topics consumer java.lang.IllegalStateException: allTopicPartitionsNumber 2 not equals expected: 5`
    
    (cherry picked from commit bbce00a2245cf05b829182a9a75a86d4e1139492)
---
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 45 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  8 +++-
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 227e74d..17da94a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -1170,4 +1171,48 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         }
     }
 
+    @Test(timeOut = testTimeout)
+    public void testPartitionsUpdatesForMultipleTopics() throws Exception {
+        final String topicName0 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-0";
+        final String subName = "my-sub";
+        admin.topics().createPartitionedTopic(topicName0, 2);
+        assertEquals(admin.topics().getPartitionedTopicMetadata(topicName0).partitions, 2);
+
+        PatternMultiTopicsConsumerImpl<String> consumer = (PatternMultiTopicsConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
+                .topicsPattern("persistent://public/default/test.*")
+                .subscriptionType(SubscriptionType.Failover)
+                .subscriptionName(subName)
+                .subscribe();
+
+        Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 2);
+        Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 2);
+
+        admin.topics().updatePartitionedTopic(topicName0, 5);
+        consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
+
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+            Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 5);
+            Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 5);
+        });
+
+        final String topicName1 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-1";
+        admin.topics().createPartitionedTopic(topicName1, 3);
+        assertEquals(admin.topics().getPartitionedTopicMetadata(topicName1).partitions, 3);
+
+        consumer.getRecheckPatternTimeout().task().run(consumer.getRecheckPatternTimeout());
+
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+            Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 8);
+            Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 8);
+        });
+
+        admin.topics().updatePartitionedTopic(topicName1, 5);
+        consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
+
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+            Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 10);
+            Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 10);
+        });
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 4149b39..55ee806 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1154,6 +1154,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         return consumers.values().stream().collect(Collectors.toList());
     }
 
+    // get all partitions that in the topics map
+    int getPartitionsOfTheTopicMap() {
+        return topics.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
     @Override
     public void pause() {
         synchronized (pauseMutex) {
@@ -1228,7 +1233,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 future.complete(null);
                 return future;
             } else if (oldPartitionNumber < currentPartitionNumber) {
-                allTopicPartitionsNumber.compareAndSet(oldPartitionNumber, currentPartitionNumber);
+                allTopicPartitionsNumber.addAndGet(currentPartitionNumber - oldPartitionNumber);
+                topics.put(topicName, currentPartitionNumber);
                 List<String> newPartitions = list.subList(oldPartitionNumber, currentPartitionNumber);
                 // subscribe new added partitions
                 List<CompletableFuture<Consumer<T>>> futureList = newPartitions