You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "poorbarcode (via GitHub)" <gi...@apache.org> on 2023/03/07 09:11:22 UTC

[GitHub] [pulsar] poorbarcode opened a new pull request, #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

poorbarcode opened a new pull request, #19737:
URL: https://github.com/apache/pulsar/pull/19737

   ### Motivation
   
   [PIP-240: A new API to unload subscriptions](https://github.com/apache/pulsar/issues/19187)
   
   ### Modifications
   
   - part-1: add a new method `unloadSubscription( String subName )` for `PersistentTopic`
   
   ### Follow-up work (not included in current PR)
   - part-2: add a new method `unloadSubscription( String subName )` for `NonPersistentTopic`
   - part-3: the API `unload subscription` in `v2/PerssitentTopics`
   - part-4: the CMD `unload subscription` in `CmdTopics`
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   - https://github.com/poorbarcode/pulsar/pull/75
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "congbobo184 (via GitHub)" <gi...@apache.org>.
congbobo184 commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1136504678


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -428,6 +430,26 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        synchronized (ledger) {
+            final PersistentSubscription sub = subscriptions.get(subName);
+            if (sub == null) {

Review Comment:
   if sub == null, may be we should return exception



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.apache.pulsar.client.api.SubscriptionType.Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Failover;
+import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import static org.testng.Assert.assertEquals;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class UnloadSubscriptionTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setSystemTopicEnabled(false);
+        conf.setTransactionCoordinatorEnabled(false);
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "unloadCases")
+    public Object[][] unloadCases (){
+        // [msgCount, enabledBatch, maxMsgPerBatch, subType, ackMsgCount]
+        return new Object[][]{
+                {100, false, 1, Exclusive, 0},
+                {100, false, 1, Failover, 0},
+                {100, false, 1, Shared, 0},
+                {100, false, 1, Key_Shared, 0},
+                {100, true, 5, Exclusive, 0},
+                {100, true, 5, Failover, 0},
+                {100, true, 5, Shared, 0},
+                {100, true, 5, Key_Shared, 0},
+                {100, false, 1, Exclusive, 50},
+                {100, false, 1, Failover, 50},
+                {100, false, 1, Shared, 50},
+                {100, false, 1, Key_Shared, 50},
+                {100, true, 5, Exclusive, 50},
+                {100, true, 5, Failover, 50},
+                {100, true, 5, Shared, 50},
+                {100, true, 5, Key_Shared, 50},
+        };
+    }
+
+    @Test(dataProvider = "unloadCases", invocationCount = 50)
+    public void testSingleConsumer(int msgCount, boolean enabledBatch, int maxMsgPerBatch, SubscriptionType subType,
+                                   int ackMsgCount) throws Exception {
+        final String topicName = "persistent://my-property/my-ns/tp-" + UUID.randomUUID().toString();
+        final String subName = "sub";
+        ListenerConsumer listenerConsumer = createListener(topicName, subName, subType);
+        ProducerAndMessageIds producerAndMessageIds =
+                createProducerAndSendMessages(topicName, msgCount, enabledBatch, maxMsgPerBatch);
+        log.info("send message-ids:{}-{}", producerAndMessageIds.messageIds.size(),
+                toString(producerAndMessageIds.messageIds));
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> allMessages = listenerConsumer.listener.messageSet;
+            assertEquals(allMessages.size(), msgCount);
+        });
+
+        if (ackMsgCount > 0){
+            List<MessageId> messageIdsToAck = producerAndMessageIds.messageIds.subList(0, ackMsgCount);
+            log.info("ack message-ids: {}", toString(messageIdsToAck));
+            listenerConsumer.consumer.acknowledge(messageIdsToAck);
+        }
+
+        listenerConsumer.listener.messageSet.clear();
+        PersistentTopic persistentTopic = getPersistentTopic(topicName);
+        persistentTopic.unloadSubscription(subName);
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> allMessages = listenerConsumer.listener.messageSet;
+            assertEquals(allMessages.size(), msgCount - ackMsgCount);

Review Comment:
   this assert, when consumer util to msgCount - ackMsgCount the return directly, so we haven check the consumer can't receive the message anymore. same as `testMultiConsumer`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -428,6 +430,26 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        synchronized (ledger) {
+            final PersistentSubscription sub = subscriptions.get(subName);
+            if (sub == null) {
+                return CompletableFuture.completedFuture(null);
+            }
+            if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){
+                return CompletableFuture.failedFuture(new RestException(Response.Status.BAD_REQUEST,
+                        "Could not reload the compaction subscription"));
+            }
+            // Fence old subscription -> Rewind cursor -> Replace with a new subscription.
+            return sub.disconnect().thenAccept(ignore -> {

Review Comment:
   the disconnect is an async method, so why do we need to lock this method by `synchronized (ledger)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155080011


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(new RestException(Response.Status.NOT_FOUND,

Review Comment:
   I don't think throwing `RestException` in the broker layer is a good approach. RestException should only throw by the web service layer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155330077


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(new RestException(Response.Status.NOT_FOUND,

Review Comment:
   > The odds are that he will simply use this exception. This will create a large amount of technical debt.
   
   No. Neither of these exceptions should occur because users will not try to unload a sub that does not exist, nor do they try to unload an unsupported Sub which named `__compaction`.
   
   As mentioned above, I share your feeling that the use of an `RestException` exception here is unreasonable, and has now been replaced with a dedicated exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1161364879


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -430,6 +433,51 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    /**
+     * Unload a subscriber.
+     * @throws SubscriptionNotFoundException If subscription not founded.
+     * @throws UnsupportedSubscriptionException If the subscription is typed compaction.
+     * @throws SubscriptionConflictUnloadException Conflict topic-close, topic-delete, another-subscribe-unload,
+     *     cannot unload subscription now
+     */
+    public CompletableFuture<Void> unloadSubscription(@Nonnull String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(
+                    new SubscriptionNotFoundException(String.format("Subscription %s not found", subName)));
+        }
+        if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){
+            return CompletableFuture.failedFuture(
+                    new UnsupportedSubscriptionException(String.format("Unsupported subscription: %s", subName)));
+        }
+        // Fence old subscription -> Rewind cursor -> Replace with a new subscription.
+        return sub.disconnect().thenCompose(ignore -> {

Review Comment:
   `thenAccept` is fine here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155080468


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(new RestException(Response.Status.NOT_FOUND,
+                    String.format("Subscription %s not found", subName)));
+        }
+        if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){

Review Comment:
   How about the deduplication subscription?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155081762


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {

Review Comment:
   Have you considered calling it concurrently, do we have any problem when using multi-thread?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155330077


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(new RestException(Response.Status.NOT_FOUND,

Review Comment:
   > The odds are that he will simply use this exception. This will create a large amount of technical debt.
   No. Neither of these exceptions should occur because users will not try to unload a sub that does not exist, nor do they try to unload an unsupported Sub which named `__compaction`.
   
   As mentioned above, I share your feeling that the use of an `RestException` exception here is unreasonable, and has now been replaced with a dedicated exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155085897


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(new RestException(Response.Status.NOT_FOUND,

Review Comment:
   > I don't think throwing RestException in the broker layer is a good approach. RestException should only throw by the web service layer.
   
   I thought about this and decided to use RestException instead of the others because this method will only be used for Web services now, and we shouldn't over-design for a scenario that doesn't exist. It is better to redesign the exception when there is another requirement to call this method。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155310749


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(new RestException(Response.Status.NOT_FOUND,
+                    String.format("Subscription %s not found", subName)));
+        }
+        if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){
+            return CompletableFuture.failedFuture(new RestException(Response.Status.BAD_REQUEST,
+                    "Could not reload the compaction subscription"));
+        }
+        // Fence old subscription -> Rewind cursor -> Replace with a new subscription.
+        return sub.disconnect().thenAccept(ignore -> {
+            sub.getCursor().rewind();

Review Comment:
   Therefore, is calling this behaviour "rewind cursor" more suitable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155086875


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(new RestException(Response.Status.NOT_FOUND,
+                    String.format("Subscription %s not found", subName)));
+        }
+        if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){

Review Comment:
   Since the deduplication feature just uses a cursor(will not create a subscription), it will not exist in the map `PersistentTopic.subscriptions`, so if call `unload( dedup )` will response "404 Not Found."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155085349


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {

Review Comment:
   It will check by `PersistentTopics` in the next PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1162256669


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1009,17 +1057,15 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
                 }
 
                 Position startPosition = new PositionImpl(ledgerId, entryId);
-                ManagedCursor cursor = null;
                 try {
-                    cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition,
-                            isReadCompacted);
+                    final ManagedCursor cursor = ledger.newNonDurableCursor(startPosition, subscriptionName,
+                            initialPosition, isReadCompacted);
+                    subscriptions.computeIfAbsent(subscriptionName, k ->

Review Comment:
   I feel it is not a necessary change, so I reverted it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155314101


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(new RestException(Response.Status.NOT_FOUND,

Review Comment:
   >It is better to redesign the exception when there is another requirement to call this method。
   
   Since Apache Pulsar is an open-source project, suppose you hand over this responsibility to the next contributor. The odds are that he will simply use this exception. This will create a large amount of technical debt.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1162250437


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -430,6 +433,51 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    /**
+     * Unload a subscriber.
+     * @throws SubscriptionNotFoundException If subscription not founded.
+     * @throws UnsupportedSubscriptionException If the subscription is typed compaction.
+     * @throws SubscriptionConflictUnloadException Conflict topic-close, topic-delete, another-subscribe-unload,
+     *     cannot unload subscription now
+     */
+    public CompletableFuture<Void> unloadSubscription(@Nonnull String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(
+                    new SubscriptionNotFoundException(String.format("Subscription %s not found", subName)));
+        }
+        if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){
+            return CompletableFuture.failedFuture(
+                    new UnsupportedSubscriptionException(String.format("Unsupported subscription: %s", subName)));
+        }
+        // Fence old subscription -> Rewind cursor -> Replace with a new subscription.
+        return sub.disconnect().thenCompose(ignore -> {

Review Comment:
   `BrokerServiceException` is not a runtime EX, we need to catch exceptions if use `thenAccept`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode closed pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode closed pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic
URL: https://github.com/apache/pulsar/pull/19737


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155081546


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1008,17 +1029,15 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
                 }
 
                 Position startPosition = new PositionImpl(ledgerId, entryId);
-                ManagedCursor cursor = null;
                 try {
-                    cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition,
-                            isReadCompacted);
+                    final ManagedCursor cursor = ledger.newNonDurableCursor(startPosition, subscriptionName,
+                            initialPosition, isReadCompacted);
+                    subscriptions.computeIfAbsent(subscriptionName, k ->
+                            new PersistentSubscription(this, subscriptionName, cursor, false,
+                            subscriptionProperties));
                 } catch (ManagedLedgerException e) {
                     return FutureUtil.failedFuture(e);
                 }
-

Review Comment:
   Why change here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#issuecomment-1499904492

   After discussing with @congbobo184, we feel it is too heavy to use a map to track the running unload task, I will find another way to do this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155091377


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1008,17 +1029,15 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
                 }
 
                 Position startPosition = new PositionImpl(ledgerId, entryId);
-                ManagedCursor cursor = null;
                 try {
-                    cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition,
-                            isReadCompacted);
+                    final ManagedCursor cursor = ledger.newNonDurableCursor(startPosition, subscriptionName,
+                            initialPosition, isReadCompacted);
+                    subscriptions.computeIfAbsent(subscriptionName, k ->
+                            new PersistentSubscription(this, subscriptionName, cursor, false,
+                            subscriptionProperties));
                 } catch (ManagedLedgerException e) {
                     return FutureUtil.failedFuture(e);
                 }
-

Review Comment:
   Just guarantee the operation `create new sub` and `unload sub` will not be executed concurrently. see https://github.com/apache/pulsar/pull/19737#discussion_r1155090950



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#issuecomment-1503139054

   ## [Codecov](https://codecov.io/gh/apache/pulsar/pull/19737?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#19737](https://codecov.io/gh/apache/pulsar/pull/19737?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ec20578) into [master](https://codecov.io/gh/apache/pulsar/commit/3b118b6bd54cd8e592b8c2cd329a3fa3dfd18a11?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3b118b6) will **increase** coverage by `0.05%`.
   > The diff coverage is `33.33%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/19737/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/19737?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #19737      +/-   ##
   ============================================
   + Coverage     72.86%   72.91%   +0.05%     
   - Complexity    31628    31819     +191     
   ============================================
     Files          1861     1865       +4     
     Lines        137500   138083     +583     
     Branches      15141    15173      +32     
   ============================================
   + Hits         100187   100681     +494     
   - Misses        29351    29397      +46     
   - Partials       7962     8005      +43     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | inttests | `24.16% <0.00%> (-0.23%)` | :arrow_down: |
   | systests | `24.93% <0.00%> (-0.08%)` | :arrow_down: |
   | unittests | `72.20% <33.33%> (+0.07%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/19737?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../pulsar/broker/service/BrokerServiceException.java](https://codecov.io/gh/apache/pulsar/pull/19737?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL0Jyb2tlclNlcnZpY2VFeGNlcHRpb24uamF2YQ==) | `86.84% <0.00%> (-2.25%)` | :arrow_down: |
   | [...sar/broker/service/persistent/PersistentTopic.java](https://codecov.io/gh/apache/pulsar/pull/19737?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudFRvcGljLmphdmE=) | `80.08% <40.00%> (+0.04%)` | :arrow_up: |
   
   ... and [158 files with indirect coverage changes](https://codecov.io/gh/apache/pulsar/pull/19737/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155087785


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(new RestException(Response.Status.NOT_FOUND,
+                    String.format("Subscription %s not found", subName)));
+        }
+        if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){
+            return CompletableFuture.failedFuture(new RestException(Response.Status.BAD_REQUEST,
+                    "Could not reload the compaction subscription"));
+        }
+        // Fence old subscription -> Rewind cursor -> Replace with a new subscription.
+        return sub.disconnect().thenAccept(ignore -> {
+            sub.getCursor().rewind();

Review Comment:
   `unloadTopic` will close the `ManagedLedger` and close all cursors, which guarantees that the states of ML and cursors are consistent. Only closing one cursor under the ML will affect the work of ML, which is a dangerous behavior.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155079255


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {

Review Comment:
   Add NPE check for `subName`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1160417473


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -430,6 +443,41 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    /**
+     * Unload a subscriber.
+     * @throws SubscriptionNotFoundException If subscription not founded.
+     * @throws UnsupportedSubscriptionException If the subscription is typed compaction.
+     */
+    public CompletableFuture<Void> unloadSubscription(@Nonnull String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(
+                    new SubscriptionNotFoundException(String.format("Subscription %s not found", subName)));
+        }
+        if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){
+            return CompletableFuture.failedFuture(
+                    new UnsupportedSubscriptionException(String.format("Unsupported subscription: %s", subName)));
+        }
+        // Fence old subscription -> Rewind cursor -> Replace with a new subscription.
+        CompletableFuture<Void> result = unloadSubscriptionFutures.computeIfAbsent(subName,
+                k -> sub.disconnect().thenAccept(ignore -> {

Review Comment:
   Why do we need to store `unloadSubscription` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode merged pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode merged PR #19737:
URL: https://github.com/apache/pulsar/pull/19737


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1161365306


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1009,17 +1057,15 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
                 }
 
                 Position startPosition = new PositionImpl(ledgerId, entryId);
-                ManagedCursor cursor = null;
                 try {
-                    cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition,
-                            isReadCompacted);
+                    final ManagedCursor cursor = ledger.newNonDurableCursor(startPosition, subscriptionName,
+                            initialPosition, isReadCompacted);
+                    subscriptions.computeIfAbsent(subscriptionName, k ->

Review Comment:
   Does it look like `subscriptions.computeIfAbsent` has changed the previous logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1142197202


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -428,6 +430,26 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        synchronized (ledger) {
+            final PersistentSubscription sub = subscriptions.get(subName);
+            if (sub == null) {
+                return CompletableFuture.completedFuture(null);
+            }
+            if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){
+                return CompletableFuture.failedFuture(new RestException(Response.Status.BAD_REQUEST,
+                        "Could not reload the compaction subscription"));
+            }
+            // Fence old subscription -> Rewind cursor -> Replace with a new subscription.
+            return sub.disconnect().thenAccept(ignore -> {

Review Comment:
   Already remove this lock



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "congbobo184 (via GitHub)" <gi...@apache.org>.
congbobo184 commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1151374369


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -428,6 +430,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(new RestException(Response.Status.NOT_FOUND,
+                    String.format("Subscription % not found", subName)));

Review Comment:
   ```suggestion
                       String.format("Subscription %s not found", subName)));
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.apache.pulsar.client.api.SubscriptionType.Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Failover;
+import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import static org.testng.Assert.assertEquals;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class UnloadSubscriptionTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setSystemTopicEnabled(false);
+        conf.setTransactionCoordinatorEnabled(false);
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "unloadCases")
+    public Object[][] unloadCases (){
+        // [msgCount, enabledBatch, maxMsgPerBatch, subType, ackMsgCount]
+        return new Object[][]{
+                {100, false, 1, Exclusive, 0},
+                {100, false, 1, Failover, 0},
+                {100, false, 1, Shared, 0},
+                {100, false, 1, Key_Shared, 0},
+                {100, true, 5, Exclusive, 0},
+                {100, true, 5, Failover, 0},
+                {100, true, 5, Shared, 0},
+                {100, true, 5, Key_Shared, 0},
+                {100, false, 1, Exclusive, 50},
+                {100, false, 1, Failover, 50},
+                {100, false, 1, Shared, 50},
+                {100, false, 1, Key_Shared, 50},
+                {100, true, 5, Exclusive, 50},
+                {100, true, 5, Failover, 50},
+                {100, true, 5, Shared, 50},
+                {100, true, 5, Key_Shared, 50},
+        };
+    }
+
+    @Test(dataProvider = "unloadCases", invocationCount = 50)
+    public void testSingleConsumer(int msgCount, boolean enabledBatch, int maxMsgPerBatch, SubscriptionType subType,
+                                   int ackMsgCount) throws Exception {
+        final String topicName = "persistent://my-property/my-ns/tp-" + UUID.randomUUID().toString();

Review Comment:
   ```suggestion
           final String topicName = "persistent://my-property/my-ns/tp-" + UUID.randomUUID();
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.apache.pulsar.client.api.SubscriptionType.Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Failover;
+import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import static org.testng.Assert.assertEquals;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class UnloadSubscriptionTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setSystemTopicEnabled(false);
+        conf.setTransactionCoordinatorEnabled(false);
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "unloadCases")
+    public Object[][] unloadCases (){
+        // [msgCount, enabledBatch, maxMsgPerBatch, subType, ackMsgCount]
+        return new Object[][]{
+                {100, false, 1, Exclusive, 0},
+                {100, false, 1, Failover, 0},
+                {100, false, 1, Shared, 0},
+                {100, false, 1, Key_Shared, 0},
+                {100, true, 5, Exclusive, 0},
+                {100, true, 5, Failover, 0},
+                {100, true, 5, Shared, 0},
+                {100, true, 5, Key_Shared, 0},
+                {100, false, 1, Exclusive, 50},
+                {100, false, 1, Failover, 50},
+                {100, false, 1, Shared, 50},
+                {100, false, 1, Key_Shared, 50},
+                {100, true, 5, Exclusive, 50},
+                {100, true, 5, Failover, 50},
+                {100, true, 5, Shared, 50},
+                {100, true, 5, Key_Shared, 50},
+        };
+    }
+
+    @Test(dataProvider = "unloadCases", invocationCount = 50)
+    public void testSingleConsumer(int msgCount, boolean enabledBatch, int maxMsgPerBatch, SubscriptionType subType,
+                                   int ackMsgCount) throws Exception {
+        final String topicName = "persistent://my-property/my-ns/tp-" + UUID.randomUUID().toString();
+        final String subName = "sub";
+        Consumer<String> consumer = createConsumer(topicName, subName, subType);
+        ProducerAndMessageIds producerAndMessageIds =
+                createProducerAndSendMessages(topicName, msgCount, enabledBatch, maxMsgPerBatch);
+        log.info("send message-ids:{}-{}", producerAndMessageIds.messageIds.size(),
+                toString(producerAndMessageIds.messageIds));
+
+        // Receive all messages and ack some.
+        MessagesEntry messagesEntry = receiveAllMessages(consumer);
+        assertEquals(messagesEntry.messageSet.size(), msgCount);
+        if (ackMsgCount > 0){
+            LinkedHashSet<MessageId> ackedMessageIds = new LinkedHashSet<>();
+            Iterator<MessageId> messageIdIterator = messagesEntry.messageIdSet.iterator();
+            for (int i = ackMsgCount; i > 0; i--){
+                ackedMessageIds.add(messageIdIterator.next());
+            }
+            consumer.acknowledge(ackedMessageIds.stream().toList());
+            log.info("ack message-ids: {}", toString(ackedMessageIds.stream().toList()));
+        }
+
+
+        // Unload subscriber.
+        PersistentTopic persistentTopic = getPersistentTopic(topicName);
+        persistentTopic.unloadSubscription(subName);
+        // Receive all messages for the second time.
+        MessagesEntry messagesEntryForTheSecondTime = receiveAllMessages(consumer);
+        log.info("received message-ids for the second time: {}",
+                toString(messagesEntryForTheSecondTime.messageIdSet.stream().toList()));
+        assertEquals(messagesEntryForTheSecondTime.messageSet.size(), msgCount - ackMsgCount);
+
+        // cleanup.
+        producerAndMessageIds.producer.close();
+        consumer.close();
+        admin.topics().delete(topicName);
+    }
+
+    @Test(dataProvider = "unloadCases", invocationCount = 50)
+    public void testMultiConsumer(int msgCount, boolean enabledBatch, int maxMsgPerBatch, SubscriptionType subType,
+                                  int ackMsgCount) throws Exception {
+        if (subType == Exclusive){
+            return;
+        }
+        final String topicName = "persistent://my-property/my-ns/tp-" + UUID.randomUUID().toString();

Review Comment:
   ```suggestion
           final String topicName = "persistent://my-property/my-ns/tp-" + UUID.randomUUID();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155090950


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {

Review Comment:
   - `create new sub` & `unload sub`:  `create a new sub` is executed only when there is no sub, and `unload sub` is performed only when there is a sub. No concurrent calls can occur
   - `unload sub` & `unload sub`: I added a concurrent Map to prevent reentrant
   - `unload sub` & `close topic`: use `PersistentTopic.lock` to make sure it's correct
   
   I missed the `2&3` two scenes earlier. Thanks for reminding me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155090992


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155330077


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(new RestException(Response.Status.NOT_FOUND,

Review Comment:
   > The odds are that he will simply use this exception. This will create a large amount of technical debt.
   
   As mentioned above, I also feel that the use of a `RestException` exception here is unreasonable, so replaced it with a dedicated exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155080468


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(new RestException(Response.Status.NOT_FOUND,
+                    String.format("Subscription %s not found", subName)));
+        }
+        if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){

Review Comment:
   Deduplication subscription?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(new RestException(Response.Status.NOT_FOUND,
+                    String.format("Subscription %s not found", subName)));
+        }
+        if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){

Review Comment:
   How about deduplication subscription?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1142196807


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -428,6 +430,26 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        synchronized (ledger) {
+            final PersistentSubscription sub = subscriptions.get(subName);
+            if (sub == null) {

Review Comment:
   Good suggestion. fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1142199277


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.apache.pulsar.client.api.SubscriptionType.Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Failover;
+import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import static org.testng.Assert.assertEquals;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class UnloadSubscriptionTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setSystemTopicEnabled(false);
+        conf.setTransactionCoordinatorEnabled(false);
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "unloadCases")
+    public Object[][] unloadCases (){
+        // [msgCount, enabledBatch, maxMsgPerBatch, subType, ackMsgCount]
+        return new Object[][]{
+                {100, false, 1, Exclusive, 0},
+                {100, false, 1, Failover, 0},
+                {100, false, 1, Shared, 0},
+                {100, false, 1, Key_Shared, 0},
+                {100, true, 5, Exclusive, 0},
+                {100, true, 5, Failover, 0},
+                {100, true, 5, Shared, 0},
+                {100, true, 5, Key_Shared, 0},
+                {100, false, 1, Exclusive, 50},
+                {100, false, 1, Failover, 50},
+                {100, false, 1, Shared, 50},
+                {100, false, 1, Key_Shared, 50},
+                {100, true, 5, Exclusive, 50},
+                {100, true, 5, Failover, 50},
+                {100, true, 5, Shared, 50},
+                {100, true, 5, Key_Shared, 50},
+        };
+    }
+
+    @Test(dataProvider = "unloadCases", invocationCount = 50)
+    public void testSingleConsumer(int msgCount, boolean enabledBatch, int maxMsgPerBatch, SubscriptionType subType,
+                                   int ackMsgCount) throws Exception {
+        final String topicName = "persistent://my-property/my-ns/tp-" + UUID.randomUUID().toString();
+        final String subName = "sub";
+        ListenerConsumer listenerConsumer = createListener(topicName, subName, subType);
+        ProducerAndMessageIds producerAndMessageIds =
+                createProducerAndSendMessages(topicName, msgCount, enabledBatch, maxMsgPerBatch);
+        log.info("send message-ids:{}-{}", producerAndMessageIds.messageIds.size(),
+                toString(producerAndMessageIds.messageIds));
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> allMessages = listenerConsumer.listener.messageSet;
+            assertEquals(allMessages.size(), msgCount);
+        });
+
+        if (ackMsgCount > 0){
+            List<MessageId> messageIdsToAck = producerAndMessageIds.messageIds.subList(0, ackMsgCount);
+            log.info("ack message-ids: {}", toString(messageIdsToAck));
+            listenerConsumer.consumer.acknowledge(messageIdsToAck);
+        }
+
+        listenerConsumer.listener.messageSet.clear();
+        PersistentTopic persistentTopic = getPersistentTopic(topicName);
+        persistentTopic.unloadSubscription(subName);
+        Awaitility.await().untilAsserted(() -> {
+            Set<String> allMessages = listenerConsumer.listener.messageSet;
+            assertEquals(allMessages.size(), msgCount - ackMsgCount);

Review Comment:
   After covering so many scenarios, it is possible to get duplicate messages. I changed the implementation to receive all messages before the verification
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155081401


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(new RestException(Response.Status.NOT_FOUND,
+                    String.format("Subscription %s not found", subName)));
+        }
+        if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){
+            return CompletableFuture.failedFuture(new RestException(Response.Status.BAD_REQUEST,
+                    "Could not reload the compaction subscription"));
+        }
+        // Fence old subscription -> Rewind cursor -> Replace with a new subscription.
+        return sub.disconnect().thenAccept(ignore -> {
+            sub.getCursor().rewind();

Review Comment:
   It looks like `unloadSubscription` just rewinds the cursor. Why we can't close it directly like `unloadTopic`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155079358


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {

Review Comment:
   IMO, it is a good idea to add a comment here to tell the caller which exception we will throw.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1155329427


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -429,6 +431,25 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    public CompletableFuture<Void> unloadSubscription(String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(new RestException(Response.Status.NOT_FOUND,
+                    String.format("Subscription %s not found", subName)));
+        }
+        if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){
+            return CompletableFuture.failedFuture(new RestException(Response.Status.BAD_REQUEST,
+                    "Could not reload the compaction subscription"));
+        }
+        // Fence old subscription -> Rewind cursor -> Replace with a new subscription.
+        return sub.disconnect().thenAccept(ignore -> {
+            sub.getCursor().rewind();

Review Comment:
   No, this operation did this two things:
   - recreate the subscription: This will clear all attributes of the Subscription, which will help resolve the problem of Subscription due to incorrect attributes quickly.
   - rewind cursor
   We can't just call it `rewindCursor`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1161365306


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1009,17 +1057,15 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
                 }
 
                 Position startPosition = new PositionImpl(ledgerId, entryId);
-                ManagedCursor cursor = null;
                 try {
-                    cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition,
-                            isReadCompacted);
+                    final ManagedCursor cursor = ledger.newNonDurableCursor(startPosition, subscriptionName,
+                            initialPosition, isReadCompacted);
+                    subscriptions.computeIfAbsent(subscriptionName, k ->

Review Comment:
   Does it look like `subscriptions.computeIfAbsent` has changed the previous logic? Because the previous logic is `put` anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1160418196


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -430,6 +443,41 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    /**
+     * Unload a subscriber.
+     * @throws SubscriptionNotFoundException If subscription not founded.
+     * @throws UnsupportedSubscriptionException If the subscription is typed compaction.
+     */
+    public CompletableFuture<Void> unloadSubscription(@Nonnull String subName) {
+        final PersistentSubscription sub = subscriptions.get(subName);
+        if (sub == null) {
+            return CompletableFuture.failedFuture(
+                    new SubscriptionNotFoundException(String.format("Subscription %s not found", subName)));
+        }
+        if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){
+            return CompletableFuture.failedFuture(
+                    new UnsupportedSubscriptionException(String.format("Unsupported subscription: %s", subName)));
+        }
+        // Fence old subscription -> Rewind cursor -> Replace with a new subscription.
+        CompletableFuture<Void> result = unloadSubscriptionFutures.computeIfAbsent(subName,
+                k -> sub.disconnect().thenAccept(ignore -> {

Review Comment:
   Just make it will work well if many tasks running concurrently, see: https://github.com/apache/pulsar/pull/19737#discussion_r1155090950



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19737: [improve] [broker] PIP-240: new public method unloadSubscription in PersistentTopic

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19737:
URL: https://github.com/apache/pulsar/pull/19737#discussion_r1162251986


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1009,17 +1057,15 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
                 }
 
                 Position startPosition = new PositionImpl(ledgerId, entryId);
-                ManagedCursor cursor = null;
                 try {
-                    cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition,
-                            isReadCompacted);
+                    final ManagedCursor cursor = ledger.newNonDurableCursor(startPosition, subscriptionName,
+                            initialPosition, isReadCompacted);
+                    subscriptions.computeIfAbsent(subscriptionName, k ->

Review Comment:
   > Does it look like subscriptions.computeIfAbsent has changed the previous logic?
   
   No. the original logic is `synchronized: put if not exists.` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org