You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/13 11:36:34 UTC

[pulsar] branch master updated: Expose consumer names after the mark delete position for the Key_Shared subscription (#8545)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6867262  Expose consumer names after the mark delete position for the Key_Shared subscription (#8545)
6867262 is described below

commit 686726272a76856bb55e3b741e245d05fe2e0274
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Nov 13 19:36:01 2020 +0800

    Expose consumer names after the mark delete position for the Key_Shared subscription (#8545)
    
    ### Motivation
    
    1. Expose consumer names after the mark delete position for the Key_Shared subscription.
    2. Remove the consumer from the recenlyJoinedConsumer depends on the valid next position of the next position. Previously, we use the position.nextPosition to decide to remove the consumer from the recenlyJoinedConsumer but this will lead to consumers can't be deleted property. For example, if ledger rollover and the mark delete position is the last position of the old ledger and the max read position is the first position of the new ledger, In this situation, we should remove the con [...]
    
    So we should get the valid next position for the mark delete position.
    
    Related to #8499
---
 pom.xml                                            |  8 ++
 pulsar-broker/pom.xml                              |  6 ++
 ...istentStickyKeyDispatcherMultipleConsumers.java | 24 ++++--
 .../service/persistent/PersistentSubscription.java | 10 +++
 .../broker/service/PersistentQueueE2ETest.java     | 19 ++---
 .../pulsar/broker/stats/SubscriptionStatsTest.java | 94 ++++++++++++++++++++++
 .../common/policies/data/SubscriptionStats.java    |  9 +++
 7 files changed, 153 insertions(+), 17 deletions(-)

diff --git a/pom.xml b/pom.xml
index 6e0faaf..7ef22c8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -193,6 +193,7 @@ flexible messaging model and an intuitive client API.</description>
     <skyscreamer.version>1.5.0</skyscreamer.version>
     <confluent.version>5.2.2</confluent.version>
     <objenesis.version>3.1</objenesis.version>
+    <awaitility.version>4.0.2</awaitility.version>
 
     <!-- Plugin dependencies -->
     <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
@@ -248,6 +249,13 @@ flexible messaging model and an intuitive client API.</description>
       </dependency>
 
       <dependency>
+        <groupId>org.awaitility</groupId>
+        <artifactId>awaitility</artifactId>
+        <version>${awaitility.version}</version>
+        <scope>test</scope>
+      </dependency>
+
+      <dependency>
         <groupId>org.mockito</groupId>
         <artifactId>mockito-core</artifactId>
         <version>${mockito.version}</version>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 635143d..7aa742b 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -169,6 +169,12 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <!-- functions related dependencies (end) -->
 
     <dependency>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 3658567..56963e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -325,15 +326,18 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
     private boolean removeConsumersFromRecentJoinedConsumers() {
         Iterator<Map.Entry<Consumer, PositionImpl>> itr = recentlyJoinedConsumers.entrySet().iterator();
-        PositionImpl mdp = (PositionImpl) cursor.getMarkDeletedPosition();
         boolean hasConsumerRemovedFromTheRecentJoinedConsumers = false;
-        while (itr.hasNext()) {
-            Map.Entry<Consumer, PositionImpl> entry = itr.next();
-            if (entry.getValue().compareTo(mdp) <= 0) {
-                itr.remove();
-                hasConsumerRemovedFromTheRecentJoinedConsumers = true;
-            } else {
-                break;
+        PositionImpl mdp = (PositionImpl) cursor.getMarkDeletedPosition();
+        if (mdp != null) {
+            PositionImpl nextPositionOfTheMarkDeletePosition = ((ManagedLedgerImpl)cursor.getManagedLedger()).getNextValidPosition(mdp);
+            while (itr.hasNext()) {
+                Map.Entry<Consumer, PositionImpl> entry = itr.next();
+                if (entry.getValue().compareTo(nextPositionOfTheMarkDeletePosition) <= 0) {
+                    itr.remove();
+                    hasConsumerRemovedFromTheRecentJoinedConsumers = true;
+                } else {
+                    break;
+                }
             }
         }
         return hasConsumerRemovedFromTheRecentJoinedConsumers;
@@ -361,6 +365,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true);
     }
 
+    public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
+        return recentlyJoinedConsumers;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
 
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 96c1289..00c8a58 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -933,6 +934,15 @@ public class PersistentSubscription implements Subscription {
         subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
         subStats.isReplicated = isReplicated();
         subStats.isDurable = cursor.isDurable();
+        if (getType() == SubType.Key_Shared && dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers) {
+            LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers =
+                    ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getRecentlyJoinedConsumers();
+            if (recentlyJoinedConsumers != null && recentlyJoinedConsumers.size() > 0) {
+                recentlyJoinedConsumers.forEach((k, v) -> {
+                    subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString());
+                });
+            }
+        }
         return subStats;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index cf334d5..1e468ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
 import org.eclipse.jetty.util.BlockingArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -493,15 +494,15 @@ public class PersistentQueueE2ETest extends BrokerTestBase {
             consumer1.acknowledge(msgId);
         }
 
-        TopicStats stats = admin.topics().getStats(topicName);
-
-        // Unacked messages count should be 0 for both consumers at this point
-        SubscriptionStats subStats = stats.subscriptions.get(subName);
-        assertEquals(subStats.msgBacklog, 0);
-
-        for (ConsumerStats cs : subStats.consumers) {
-            assertEquals(cs.unackedMessages, 0);
-        }
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+            TopicStats stats = admin.topics().getStats(topicName);
+            // Unacked messages count should be 0 for both consumers at this point
+            SubscriptionStats subStats = stats.subscriptions.get(subName);
+            assertEquals(subStats.msgBacklog, 0);
+            for (ConsumerStats cs : subStats.consumers) {
+                assertEquals(cs.unackedMessages, 0);
+            }
+        });
 
         producer.close();
         consumer1.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
new file mode 100644
index 0000000..e441e19
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.UUID;
+
+@Slf4j
+public class SubscriptionStatsTest extends ProducerConsumerBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testConsumersAfterMarkDelete() throws PulsarClientException, PulsarAdminException {
+        final String topicName = "persistent://my-property/my-ns/testConsumersAfterMarkDelete-"
+                + UUID.randomUUID().toString();
+        final String subName = "my-sub";
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .receiverQueueSize(10)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        final int messages = 100;
+        for (int i = 0; i < messages; i++) {
+            producer.send(String.valueOf(i).getBytes());
+        }
+
+        // Receive by do not ack the message, so that the next consumer can added to the recentJoinedConsumer of the dispatcher.
+        consumer1.receive();
+
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .receiverQueueSize(10)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        TopicStats stats = admin.topics().getStats(topicName);
+        Assert.assertEquals(stats.subscriptions.size(), 1);
+        Assert.assertEquals(stats.subscriptions.entrySet().iterator().next().getValue()
+                .consumersAfterMarkDeletePosition.size(), 1);
+
+        consumer1.close();
+        consumer2.close();
+        producer.close();
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 18002b5..e5e6feb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -21,7 +21,11 @@ package org.apache.pulsar.common.policies.data;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.Lists;
+
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 
 /**
@@ -91,8 +95,12 @@ public class SubscriptionStats {
     /** Mark that the subscription state is kept in sync across different regions. */
     public boolean isReplicated;
 
+    /** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */
+    public Map<String, String> consumersAfterMarkDeletePosition;
+
     public SubscriptionStats() {
         this.consumers = Lists.newArrayList();
+        this.consumersAfterMarkDeletePosition = new LinkedHashMap<>();
     }
 
     public void reset() {
@@ -134,6 +142,7 @@ public class SubscriptionStats {
                 this.consumers.get(i).add(stats.consumers.get(i));
             }
         }
+        this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition);
         return this;
     }
 }