You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/06 01:25:30 UTC
[pulsar] branch master updated: Message replays on Key shared
subscription are breaking ordering (#7108)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c0260e9 Message replays on Key shared subscription are breaking ordering (#7108)
c0260e9 is described below
commit c0260e9cb26010e16d6c947f060dc999a689f94a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jun 5 18:25:20 2020 -0700
Message replays on Key shared subscription are breaking ordering (#7108)
---
.../client/api/KeySharedSubscriptionTest.java | 57 ++++++++++++++++++++++
.../collections/ConcurrentSortedLongPairSet.java | 13 ++---
2 files changed, 62 insertions(+), 8 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 4ae57b3..8c6c033 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -598,6 +599,62 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
assertTrue(readPosition.getEntryId() < 1000);
}
+ @Test
+ public void testRemoveFirstConsumer() throws Exception {
+ this.conf.setSubscriptionKeySharedEnable(true);
+ String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();
+
+ @Cleanup
+ Producer<Integer> producer = createProducer(topic, false);
+
+ @Cleanup
+ Consumer<Integer> c1 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("key_shared")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .receiverQueueSize(10)
+ .consumerName("c1")
+ .subscribe();
+
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage()
+ .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+ .value(i)
+ .send();
+ }
+
+ // All the already published messages will be pre-fetched by C1.
+
+ // Adding a new consumer.
+ @Cleanup
+ Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("key_shared")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .receiverQueueSize(10)
+ .consumerName("c2")
+ .subscribe();
+
+ for (int i = 10; i < 20; i++) {
+ producer.newMessage()
+ .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+ .value(i)
+ .send();
+ }
+
+ // C2 will not be able to receive any messages until C1 is done processing whatever he got prefetched
+ assertNull(c2.receive(100, TimeUnit.MILLISECONDS));
+
+ c1.close();
+
+ // Now C2 will get all messages
+ for (int i = 0; i < 20; i++) {
+ Message<Integer> msg = c2.receive();
+ assertEquals(msg.getValue().intValue(), i);
+ c2.acknowledge(msg);
+ }
+ }
+
private Producer<Integer> createProducer(String topic, boolean enableBatch) throws PulsarClientException {
Producer<Integer> producer = null;
if (enableBatch) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
index 7df8f0f..b1eb331 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.common.util.collections;
import java.util.NavigableMap;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -130,17 +131,13 @@ public class ConcurrentSortedLongPairSet implements LongPairSet {
@Override
public <T> Set<T> items(int numberOfItems, LongPairFunction<T> longPairConverter) {
- Set<T> items = new TreeSet<>();
- AtomicInteger count = new AtomicInteger(0);
+ NavigableSet<T> items = new TreeSet<>();
for (Long item1 : longPairSets.navigableKeySet()) {
- if (count.get() >= numberOfItems) {// already found set of positions
- break;
- }
ConcurrentLongPairSet messagesToReplay = longPairSets.get(item1);
messagesToReplay.forEach((i1, i2) -> {
- if (count.get() < numberOfItems) {
- items.add(longPairConverter.apply(i1, i2));
- count.incrementAndGet();
+ items.add(longPairConverter.apply(i1, i2));
+ if (items.size() > numberOfItems) {
+ items.pollLast();
}
});
}