You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/06 01:25:30 UTC

[pulsar] branch master updated: Message replays on Key shared subscription are breaking ordering (#7108)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c0260e9  Message replays on Key shared subscription are breaking ordering (#7108)
c0260e9 is described below

commit c0260e9cb26010e16d6c947f060dc999a689f94a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jun 5 18:25:20 2020 -0700

    Message replays on Key shared subscription are breaking ordering (#7108)
---
 .../client/api/KeySharedSubscriptionTest.java      | 57 ++++++++++++++++++++++
 .../collections/ConcurrentSortedLongPairSet.java   | 13 ++---
 2 files changed, 62 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 4ae57b3..8c6c033 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -598,6 +599,62 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         assertTrue(readPosition.getEntryId() < 1000);
     }
 
+    @Test
+    public void testRemoveFirstConsumer() throws Exception {
+        this.conf.setSubscriptionKeySharedEnable(true);
+        String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();
+
+        @Cleanup
+        Producer<Integer> producer = createProducer(topic, false);
+
+        @Cleanup
+        Consumer<Integer> c1 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .receiverQueueSize(10)
+                .consumerName("c1")
+                .subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+                    .value(i)
+                    .send();
+        }
+
+        // All the already published messages will be pre-fetched by C1.
+
+        // Adding a new consumer.
+        @Cleanup
+        Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .receiverQueueSize(10)
+                .consumerName("c2")
+                .subscribe();
+
+        for (int i = 10; i < 20; i++) {
+            producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+                    .value(i)
+                    .send();
+        }
+
+        // C2 will not be able to receive any messages until C1 is done processing whatever he got prefetched
+        assertNull(c2.receive(100, TimeUnit.MILLISECONDS));
+
+        c1.close();
+
+        // Now C2 will get all messages
+        for (int i = 0; i < 20; i++) {
+            Message<Integer> msg = c2.receive();
+            assertEquals(msg.getValue().intValue(), i);
+            c2.acknowledge(msg);
+        }
+    }
+
     private Producer<Integer> createProducer(String topic, boolean enableBatch) throws PulsarClientException {
         Producer<Integer> producer = null;
         if (enableBatch) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
index 7df8f0f..b1eb331 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.common.util.collections;
 
 import java.util.NavigableMap;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -130,17 +131,13 @@ public class ConcurrentSortedLongPairSet implements LongPairSet {
 
     @Override
     public <T> Set<T> items(int numberOfItems, LongPairFunction<T> longPairConverter) {
-        Set<T> items = new TreeSet<>();
-        AtomicInteger count = new AtomicInteger(0);
+        NavigableSet<T> items = new TreeSet<>();
         for (Long item1 : longPairSets.navigableKeySet()) {
-            if (count.get() >= numberOfItems) {// already found set of positions
-                break;
-            }
             ConcurrentLongPairSet messagesToReplay = longPairSets.get(item1);
             messagesToReplay.forEach((i1, i2) -> {
-                if (count.get() < numberOfItems) {
-                    items.add(longPairConverter.apply(i1, i2));
-                    count.incrementAndGet();
+                items.add(longPairConverter.apply(i1, i2));
+                if (items.size() > numberOfItems) {
+                    items.pollLast();
                 }
             });
         }