You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/04/11 13:38:41 UTC

[pulsar] branch master updated: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic (#19737)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cd2aa550d0f [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic (#19737)
cd2aa550d0f is described below

commit cd2aa550d0fe4e72b5ff88c4f6c1c2795b3ff2cd
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Tue Apr 11 21:38:32 2023 +0800

    [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic (#19737)
    
    part-1 of PIP-240: add a new method unloadSubscription( String subName ) for PersistentTopic
---
 .../broker/service/BrokerServiceException.java     |  12 +
 .../broker/service/persistent/PersistentTopic.java |  48 ++++
 .../pulsar/client/api/UnloadSubscriptionTest.java  | 265 +++++++++++++++++++++
 3 files changed, 325 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index fd3a391bca3..3e77588b245 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -160,6 +160,18 @@ public class BrokerServiceException extends Exception {
         }
     }
 
+    public static class UnsupportedSubscriptionException extends BrokerServiceException {
+        public UnsupportedSubscriptionException(String msg) {
+            super(msg);
+        }
+    }
+
+    public static class SubscriptionConflictUnloadException extends BrokerServiceException {
+        public SubscriptionConflictUnloadException(String msg) {
+            super(msg);
+        }
+    }
+
     public static class SubscriptionBusyException extends BrokerServiceException {
         public SubscriptionBusyException(String msg) {
             super(msg);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0374fc98212..0f5e6043981 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
 import lombok.Getter;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -94,12 +95,14 @@ import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
+import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionConflictUnloadException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicBacklogQuotaExceededException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
+import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedSubscriptionException;
 import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
@@ -430,6 +433,51 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return pendingWriteOps;
     }
 
+    /**
+     * Unload a subscriber.
+     * @throws SubscriptionNotFoundException If subscription not founded.
+     * @throws UnsupportedSubscriptionException If the subscription is typed compaction.
+     * @throws SubscriptionConflictUnloadException Conflict topic-close, topic-delete, another-subscribe-unload,
+     *     cannot unload subscription now
+     */
+    public CompletableFuture<Void> unloadSubscription(@Nonnull String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(
+                    new SubscriptionNotFoundException(String.format("Subscription %s not found", subName)));
+        }
+        if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){
+            return CompletableFuture.failedFuture(
+                    new UnsupportedSubscriptionException(String.format("Unsupported subscription: %s", subName)));
+        }
+        // Fence old subscription -> Rewind cursor -> Replace with a new subscription.
+        return sub.disconnect().thenCompose(ignore -> {
+            if (!lock.writeLock().tryLock()) {
+                return CompletableFuture.failedFuture(new SubscriptionConflictUnloadException(String.format("Conflict"
+                        + " topic-close, topic-delete, another-subscribe-unload, cannot unload subscription %s now",
+                        topic, subName)));
+            }
+            try {
+                if (isFenced) {
+                    return CompletableFuture.failedFuture(new TopicFencedException(String.format(
+                            "Topic[%s] is fenced, can not unload subscription %s now", topic, subName)));
+                }
+                if (sub != subscriptions.get(subName)) {
+                    // Another task already finished.
+                    return CompletableFuture.failedFuture(new SubscriptionConflictUnloadException(String.format(
+                            "Another unload subscriber[%s] has been finished, do not repeat call.", subName)));
+                }
+                sub.getCursor().rewind();
+                PersistentSubscription subNew = PersistentTopic.this.createPersistentSubscription(sub.getName(),
+                        sub.getCursor(), sub.isReplicated(), sub.getSubscriptionProperties());
+                subscriptions.put(subName, subNew);
+                return CompletableFuture.completedFuture(null);
+            } finally {
+                lock.writeLock().unlock();
+            }
+        });
+    }
+
     private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
             boolean replicated, Map<String, String> subscriptionProperties) {
         Objects.requireNonNull(compactedTopic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
new file mode 100644
index 00000000000..93d5bf30ec6
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.apache.pulsar.client.api.SubscriptionType.Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Failover;
+import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import static org.testng.Assert.assertEquals;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class UnloadSubscriptionTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setSystemTopicEnabled(false);
+        conf.setTransactionCoordinatorEnabled(false);
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "unloadCases")
+    public Object[][] unloadCases (){
+        // [msgCount, enabledBatch, maxMsgPerBatch, subType, ackMsgCount]
+        return new Object[][]{
+                {100, false, 1, Exclusive, 0},
+                {100, false, 1, Failover, 0},
+                {100, false, 1, Shared, 0},
+                {100, false, 1, Key_Shared, 0},
+                {100, true, 5, Exclusive, 0},
+                {100, true, 5, Failover, 0},
+                {100, true, 5, Shared, 0},
+                {100, true, 5, Key_Shared, 0},
+                {100, false, 1, Exclusive, 50},
+                {100, false, 1, Failover, 50},
+                {100, false, 1, Shared, 50},
+                {100, false, 1, Key_Shared, 50},
+                {100, true, 5, Exclusive, 50},
+                {100, true, 5, Failover, 50},
+                {100, true, 5, Shared, 50},
+                {100, true, 5, Key_Shared, 50},
+        };
+    }
+
+    @Test(dataProvider = "unloadCases")
+    public void testSingleConsumer(int msgCount, boolean enabledBatch, int maxMsgPerBatch, SubscriptionType subType,
+                                   int ackMsgCount) throws Exception {
+        final String topicName = "persistent://my-property/my-ns/tp-" + UUID.randomUUID();
+        final String subName = "sub";
+        Consumer<String> consumer = createConsumer(topicName, subName, subType);
+        ProducerAndMessageIds producerAndMessageIds =
+                createProducerAndSendMessages(topicName, msgCount, enabledBatch, maxMsgPerBatch);
+        log.info("send message-ids:{}-{}", producerAndMessageIds.messageIds.size(),
+                toString(producerAndMessageIds.messageIds));
+
+        // Receive all messages and ack some.
+        MessagesEntry messagesEntry = receiveAllMessages(consumer);
+        assertEquals(messagesEntry.messageSet.size(), msgCount);
+        if (ackMsgCount > 0){
+            LinkedHashSet<MessageId> ackedMessageIds = new LinkedHashSet<>();
+            Iterator<MessageId> messageIdIterator = messagesEntry.messageIdSet.iterator();
+            for (int i = ackMsgCount; i > 0; i--){
+                ackedMessageIds.add(messageIdIterator.next());
+            }
+            consumer.acknowledge(ackedMessageIds.stream().toList());
+            log.info("ack message-ids: {}", toString(ackedMessageIds.stream().toList()));
+        }
+
+
+        // Unload subscriber.
+        PersistentTopic persistentTopic = getPersistentTopic(topicName);
+        persistentTopic.unloadSubscription(subName);
+        // Receive all messages for the second time.
+        MessagesEntry messagesEntryForTheSecondTime = receiveAllMessages(consumer);
+        log.info("received message-ids for the second time: {}",
+                toString(messagesEntryForTheSecondTime.messageIdSet.stream().toList()));
+        assertEquals(messagesEntryForTheSecondTime.messageSet.size(), msgCount - ackMsgCount);
+
+        // cleanup.
+        producerAndMessageIds.producer.close();
+        consumer.close();
+        admin.topics().delete(topicName);
+    }
+
+    @Test(dataProvider = "unloadCases")
+    public void testMultiConsumer(int msgCount, boolean enabledBatch, int maxMsgPerBatch, SubscriptionType subType,
+                                  int ackMsgCount) throws Exception {
+        if (subType == Exclusive){
+            return;
+        }
+        final String topicName = "persistent://my-property/my-ns/tp-" + UUID.randomUUID();
+        final String subName = "sub";
+        Consumer<String> consumer1 = createConsumer(topicName, subName, subType);
+        Consumer<String> consumer2 = createConsumer(topicName, subName, subType);
+        ProducerAndMessageIds producerAndMessageIds =
+                createProducerAndSendMessages(topicName, msgCount, enabledBatch, maxMsgPerBatch);
+        log.info("send message-ids:{}-{}", producerAndMessageIds.messageIds.size(),
+                toString(producerAndMessageIds.messageIds));
+
+        // Receive all messages and ack some.
+        MessagesEntry messagesEntry1 = receiveAllMessages(consumer1);
+        MessagesEntry messagesEntry2 = receiveAllMessages(consumer2);
+        LinkedHashSet<String> allMessages = new LinkedHashSet<>();
+        allMessages.addAll(messagesEntry1.messageSet);
+        allMessages.addAll(messagesEntry2.messageSet);
+        assertEquals(allMessages.size(), msgCount);
+        if (ackMsgCount > 0){
+            LinkedHashSet<MessageId> allMessageIds = new LinkedHashSet<>();
+            LinkedHashSet<MessageId> ackedMessageIds = new LinkedHashSet<>();
+            allMessageIds.addAll(messagesEntry1.messageIdSet);
+            allMessageIds.addAll(messagesEntry2.messageIdSet);
+            Iterator<MessageId> messageIdIterator = allMessageIds.iterator();
+            for (int i = ackMsgCount; i > 0; i--){
+                ackedMessageIds.add(messageIdIterator.next());
+            }
+            consumer1.acknowledge(ackedMessageIds.stream().toList());
+            log.info("ack message-ids: {}", toString(ackedMessageIds.stream().toList()));
+        }
+
+        // Unload subscriber.
+        PersistentTopic persistentTopic = getPersistentTopic(topicName);
+        persistentTopic.unloadSubscription(subName);
+
+        // Receive all messages for the second time.
+        MessagesEntry messagesEntryForTheSecondTime1 = receiveAllMessages(consumer1);
+        MessagesEntry messagesEntryForTheSecondTime2 = receiveAllMessages(consumer2);
+        LinkedHashSet<String> allMessagesForTheSecondTime = new LinkedHashSet<>();
+        allMessagesForTheSecondTime.addAll(messagesEntryForTheSecondTime1.messageSet);
+        allMessagesForTheSecondTime.addAll(messagesEntryForTheSecondTime2.messageSet);
+        LinkedHashSet<MessageId> allMessageIdsForTheSecondTime = new LinkedHashSet<>();
+        allMessageIdsForTheSecondTime.addAll(messagesEntry1.messageIdSet);
+        allMessageIdsForTheSecondTime.addAll(messagesEntry2.messageIdSet);
+        log.info("received message-ids for the second time: {}",
+                toString(allMessageIdsForTheSecondTime.stream().toList()));
+        assertEquals(allMessagesForTheSecondTime.size(), msgCount - ackMsgCount);
+
+        // cleanup.
+        producerAndMessageIds.producer.close();
+        consumer1.close();
+        consumer2.close();
+        admin.topics().delete(topicName);
+    }
+
+    private static String toString(List<MessageId> messageIds){
+        List<String> messageIdStrings = new ArrayList<>(messageIds.size());
+        for (MessageId messageId : messageIds){
+            MessageIdImpl messageIdImpl;
+            if (messageId instanceof TopicMessageIdImpl) {
+                TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
+                messageIdImpl = (MessageIdImpl) topicMessageId.getInnerMessageId();
+            } else {
+                messageIdImpl = (MessageIdImpl) messageId;
+            }
+            StringBuilder stringBuilder = new StringBuilder(String.valueOf(messageIdImpl.getEntryId()));
+            if (messageIdImpl instanceof BatchMessageIdImpl){
+                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageIdImpl;
+                stringBuilder.append("_")
+                        .append(batchMessageId.getBatchIndex())
+                        .append("/")
+                        .append(batchMessageId.getBatchSize());
+            }
+            messageIdStrings.add(stringBuilder.toString());
+        }
+        return messageIdStrings.toString();
+    }
+
+    private PersistentTopic getPersistentTopic(String topicName) {
+        return (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
+    }
+
+    private ProducerAndMessageIds createProducerAndSendMessages(String topicName, int msgCount, boolean enabledBatch,
+                                                           int maxMsgPerBatch) throws Exception {
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(enabledBatch)
+                .batchingMaxMessages(maxMsgPerBatch)
+                .create();
+        ArrayList<CompletableFuture<MessageId>> messageIds = new ArrayList<>();
+        for (int i = 0; i < msgCount; i++) {
+            messageIds.add(producer.newMessage().key(String.valueOf(i % 10)).value(String.valueOf(i)).sendAsync());
+        }
+        FutureUtil.waitForAll(messageIds).join();
+        return new ProducerAndMessageIds(producer,
+                messageIds.stream().map(CompletableFuture::join).collect(Collectors.toList()));
+    }
+
+    private record ProducerAndMessageIds(Producer<String> producer, List<MessageId> messageIds) {}
+
+    private Consumer<String> createConsumer(String topicName, String subName, SubscriptionType subType)
+            throws Exception {
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(subType)
+                .isAckReceiptEnabled(true)
+                .subscribe();
+        return consumer;
+    }
+
+    private MessagesEntry receiveAllMessages(Consumer<String> consumer) throws Exception {
+        final Set<String> messageSet = Collections.synchronizedSet(new LinkedHashSet<>());
+        final Set<MessageId> messageIdSet = Collections.synchronizedSet(new LinkedHashSet<>());
+        while (true) {
+            Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+            if (msg == null){
+                break;
+            }
+            messageIdSet.add(msg.getMessageId());
+            messageSet.add(msg.getValue());
+        }
+        return new MessagesEntry(messageSet, messageIdSet);
+    }
+
+    private record MessagesEntry(Set<String> messageSet, Set<MessageId> messageIdSet) {}
+
+}
\ No newline at end of file