You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/05 06:47:26 UTC
[pulsar] branch master updated: Fix the partition number not equals
expected error (#9446)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bbce00a Fix the partition number not equals expected error (#9446)
bbce00a is described below
commit bbce00a2245cf05b829182a9a75a86d4e1139492
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Feb 5 14:46:47 2021 +0800
Fix the partition number not equals expected error (#9446)
Fixes #8000
### Motivation
Fix the partition number not equals expected error
### Verifying this change
New tests added, without this fix, you can see errors like
`topics consumer java.lang.IllegalStateException: allTopicPartitionsNumber 2 not equals expected: 5`
---
.../pulsar/client/impl/TopicsConsumerImplTest.java | 45 ++++++++++++++++++++++
.../client/impl/MultiTopicsConsumerImpl.java | 8 +++-
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 07af862..e108535 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -1219,4 +1220,48 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
}
}
+ @Test(timeOut = testTimeout)
+ public void testPartitionsUpdatesForMultipleTopics() throws Exception {
+ final String topicName0 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-0";
+ final String subName = "my-sub";
+ admin.topics().createPartitionedTopic(topicName0, 2);
+ assertEquals(admin.topics().getPartitionedTopicMetadata(topicName0).partitions, 2);
+
+ PatternMultiTopicsConsumerImpl<String> consumer = (PatternMultiTopicsConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
+ .topicsPattern("persistent://public/default/test.*")
+ .subscriptionType(SubscriptionType.Failover)
+ .subscriptionName(subName)
+ .subscribe();
+
+ Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 2);
+ Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 2);
+
+ admin.topics().updatePartitionedTopic(topicName0, 5);
+ consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
+
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+ Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 5);
+ Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 5);
+ });
+
+ final String topicName1 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-1";
+ admin.topics().createPartitionedTopic(topicName1, 3);
+ assertEquals(admin.topics().getPartitionedTopicMetadata(topicName1).partitions, 3);
+
+ consumer.getRecheckPatternTimeout().task().run(consumer.getRecheckPatternTimeout());
+
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+ Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 8);
+ Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 8);
+ });
+
+ admin.topics().updatePartitionedTopic(topicName1, 5);
+ consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
+
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+ Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 10);
+ Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 10);
+ });
+ }
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index cca8f84..6cf6d3d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1172,6 +1172,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
return consumers.values().stream().collect(Collectors.toList());
}
+ // get all partitions that in the topics map
+ int getPartitionsOfTheTopicMap() {
+ return topics.values().stream().mapToInt(Integer::intValue).sum();
+ }
+
@Override
public void pause() {
synchronized (pauseMutex) {
@@ -1246,7 +1251,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
future.complete(null);
return future;
} else if (oldPartitionNumber < currentPartitionNumber) {
- allTopicPartitionsNumber.compareAndSet(oldPartitionNumber, currentPartitionNumber);
+ allTopicPartitionsNumber.addAndGet(currentPartitionNumber - oldPartitionNumber);
+ topics.put(topicName, currentPartitionNumber);
List<String> newPartitions = list.subList(oldPartitionNumber, currentPartitionNumber);
// subscribe new added partitions
List<CompletableFuture<Consumer<T>>> futureList = newPartitions