You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/11/28 22:12:58 UTC

[pulsar] branch master updated: Remove cursor while remove non-durable subscription (#5719)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e30c08  Remove cursor while remove non-durable subscription (#5719)
2e30c08 is described below

commit 2e30c086b2461531c62164d09ea148928a0e3ae6
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Nov 29 06:12:49 2019 +0800

    Remove cursor while remove non-durable subscription (#5719)
    
    ### Motivation
    
    Remove cursor from cursors of managed ledger while remove non-durable subscription.  The data deletion is depends the mark delete position of all cursors, if left a unused cursor in the cursors of managed ledger, data can't be delete as expected.
    
    ### Modifications
    
    Remove cursor while remove non-durable subscription
---
 .../AbstractDispatcherSingleActiveConsumer.java    |  4 +-
 .../org/apache/pulsar/broker/service/Consumer.java | 12 +++++-
 .../apache/pulsar/broker/service/Dispatcher.java   |  6 ++-
 .../apache/pulsar/broker/service/Subscription.java |  6 ++-
 .../nonpersistent/NonPersistentDispatcher.java     |  4 +-
 .../NonPersistentDispatcherMultipleConsumers.java  |  2 +-
 .../nonpersistent/NonPersistentSubscription.java   |  2 +-
 .../PersistentDispatcherMultipleConsumers.java     |  4 +-
 .../service/persistent/PersistentSubscription.java | 14 ++++++-
 .../org/apache/pulsar/client/impl/ReaderTest.java  | 44 +++++++++++++++++++---
 10 files changed, 79 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 02fcea1..509c3e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -219,11 +219,11 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
      *
      * @return
      */
-    public synchronized CompletableFuture<Void> disconnectAllConsumers() {
+    public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean isResetCursor) {
         closeFuture = new CompletableFuture<>();
 
         if (!consumers.isEmpty()) {
-            consumers.forEach(Consumer::disconnect);
+            consumers.forEach(consumer -> consumer.disconnect(isResetCursor));
             cancelPendingRead();
         } else {
             // no consumer connected, complete disconnect immediately
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index fa45eb4..8c0e1d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -293,15 +293,23 @@ public class Consumer {
      * pending message acks
      */
     public void close() throws BrokerServiceException {
-        subscription.removeConsumer(this);
+        close(false);
+    }
+
+    public void close(boolean isResetCursor) throws BrokerServiceException {
+        subscription.removeConsumer(this, isResetCursor);
         cnx.removedConsumer(this);
     }
 
     public void disconnect() {
+        disconnect(false);
+    }
+
+    public void disconnect(boolean isResetCursor) {
         log.info("Disconnecting consumer: {}", this);
         cnx.closeConsumer(this);
         try {
-            close();
+            close(isResetCursor);
         } catch (BrokerServiceException e) {
             log.warn("Consumer {} was already closed: {}", this, e.getMessage(), e);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index 24118ae..cffa024 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -60,7 +60,11 @@ public interface Dispatcher {
      *
      * @return
      */
-    CompletableFuture<Void> disconnectAllConsumers();
+    CompletableFuture<Void> disconnectAllConsumers(boolean isResetCursor);
+
+    default CompletableFuture<Void> disconnectAllConsumers() {
+        return disconnectAllConsumers(false);
+    }
 
     void resetCloseFuture();
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index c044fa5..18c7c49 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -37,7 +37,11 @@ public interface Subscription {
 
     void addConsumer(Consumer consumer) throws BrokerServiceException;
 
-    void removeConsumer(Consumer consumer) throws BrokerServiceException;
+    default void removeConsumer(Consumer consumer) throws BrokerServiceException {
+        removeConsumer(consumer, false);
+    }
+
+    void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException;
 
     void consumerFlow(Consumer consumer, int additionalNumberOfMessages);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java
index 9d5694d..fc8fdbc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java
@@ -30,7 +30,7 @@ import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 
 
-public interface NonPersistentDispatcher extends Dispatcher{
+public interface NonPersistentDispatcher extends Dispatcher {
 
     void addConsumer(Consumer consumer) throws BrokerServiceException;
 
@@ -44,7 +44,7 @@ public interface NonPersistentDispatcher extends Dispatcher{
 
     CompletableFuture<Void> close() ;
 
-    CompletableFuture<Void> disconnectAllConsumers();
+    CompletableFuture<Void> disconnectAllConsumers(boolean isResetCursor);
 
     void reset();
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 5d569a1..5375f39 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -164,7 +164,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
     }
 
     @Override
-    public synchronized CompletableFuture<Void> disconnectAllConsumers() {
+    public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean isResetCursor) {
         closeFuture = new CompletableFuture<>();
         if (consumerList.isEmpty()) {
             closeFuture.complete(null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index e8babda..6520420 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -165,7 +165,7 @@ public class NonPersistentSubscription implements Subscription {
     }
 
     @Override
-    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
+    public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException {
         if (dispatcher != null) {
             dispatcher.removeConsumer(consumer);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index e5a8b64..5d536c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -390,12 +390,12 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     }
 
     @Override
-    public synchronized CompletableFuture<Void> disconnectAllConsumers() {
+    public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean isResetCursor) {
         closeFuture = new CompletableFuture<>();
         if (consumerList.isEmpty()) {
             closeFuture.complete(null);
         } else {
-            consumerList.forEach(Consumer::disconnect);
+            consumerList.forEach(consumer -> consumer.disconnect(isResetCursor));
             if (havePendingRead && cursor.cancelPendingReadRequest()) {
                 havePendingRead = false;
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index c3bdd24..200d06e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -44,6 +44,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
@@ -256,7 +257,7 @@ public class PersistentSubscription implements Subscription {
     }
 
     @Override
-    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
+    public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException {
         cursor.updateLastActive();
         if (dispatcher != null) {
             dispatcher.removeConsumer(consumer);
@@ -286,6 +287,15 @@ public class PersistentSubscription implements Subscription {
                 // topic.remove again try to access same map which creates deadlock. so, execute it in different thread.
                 topic.getBrokerService().pulsar().getExecutor().submit(() ->{
                     topic.removeSubscription(subName);
+                    // Also need remove the cursor here, otherwise the data deletion will not work well.
+                    // Because data deletion depends on the mark delete position of all cursors.
+                    if (!isResetCursor) {
+                        try {
+                            topic.getManagedLedger().deleteCursor(cursor.getName());
+                        } catch (InterruptedException | ManagedLedgerException e) {
+                            log.warn("[{}] [{}] Failed to remove non durable cursor", topic.getName(), subName, e);
+                        }
+                    }
                 });
             }
         }
@@ -678,7 +688,7 @@ public class PersistentSubscription implements Subscription {
         // Lock the Subscription object before locking the Dispatcher object to avoid deadlocks
         synchronized (this) {
             if (dispatcher != null && dispatcher.isConsumerConnected()) {
-                disconnectFuture = dispatcher.disconnectAllConsumers();
+                disconnectFuture = dispatcher.disconnectAllConsumers(true);
             } else {
                 disconnectFuture = CompletableFuture.completedFuture(null);
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 3eece11..b682c60 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -21,18 +21,15 @@ package org.apache.pulsar.client.impl;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
+import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -45,6 +42,7 @@ import org.testng.annotations.Test;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -252,4 +250,40 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
 
         assertFalse(reader.hasMessageAvailable());
     }
+
+    /**
+     * We need to ensure that delete subscription of read also need to delete the
+     * non-durable cursor, because data deletion depends on the mark delete position of all cursors.
+     */
+    @Test
+    public void testRemoveSubscriptionForReaderNeedRemoveCursor() throws IOException, PulsarAdminException {
+
+        final String topic = "persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor";
+
+        @Cleanup
+        Reader<byte[]> reader1 = pulsarClient.newReader()
+            .topic(topic)
+            .startMessageId(MessageId.earliest)
+            .create();
+
+        @Cleanup
+        Reader<byte[]> reader2 = pulsarClient.newReader()
+            .topic(topic)
+            .startMessageId(MessageId.earliest)
+            .create();
+
+        Assert.assertEquals(admin.topics().getStats(topic).subscriptions.size(), 2);
+        Assert.assertEquals(admin.topics().getInternalStats(topic).cursors.size(), 2);
+
+        reader1.close();
+
+        Assert.assertEquals(admin.topics().getStats(topic).subscriptions.size(), 1);
+        Assert.assertEquals(admin.topics().getInternalStats(topic).cursors.size(), 1);
+
+        reader2.close();
+
+        Assert.assertEquals(admin.topics().getStats(topic).subscriptions.size(), 0);
+        Assert.assertEquals(admin.topics().getInternalStats(topic).cursors.size(), 0);
+
+    }
 }