You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/30 02:59:34 UTC

[pulsar] branch branch-2.6 updated: Fix deduplication cursor does not delete after disabling message deduplication (#7656)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 0a19dec  Fix deduplication cursor does not delete after disabling message deduplication (#7656)
0a19dec is described below

commit 0a19dec4e84f153b2b0e98f978e4bba798e0c37a
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jul 30 09:06:26 2020 +0800

    Fix deduplication cursor does not delete after disabling message deduplication (#7656)
    
    ### Motivation
    
    Fix deduplication cursor does not delete after disabling message deduplication. The issue occurs when enabling the message deduplication at the broker.conf and then disable it and restart the broker. The dedup cursor will not be deleted.
    
    (cherry picked from commit 2bba620fb47afa6a4dceed36e332059792266313)
---
 .../bookkeeper/mledger/ManagedLedgerException.java |  6 +++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 +-
 .../service/persistent/MessageDeduplication.java   | 44 +++++++++++++++++++---
 .../pulsar/broker/service/ServerCnxTest.java       |  3 --
 4 files changed, 46 insertions(+), 10 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 6e6e6a2..0b2cd4e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -149,6 +149,12 @@ public class ManagedLedgerException extends Exception {
         }
     }
 
+    public static class CursorNotFoundException extends ManagedLedgerException {
+        public CursorNotFoundException(String msg) {
+            super(msg);
+        }
+    }
+
     @Override
     public synchronized Throwable fillInStackTrace() {
         // Disable stack traces to be filled in
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 810b3f8..eaf06fc 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -774,7 +774,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             final Object ctx) {
         final ManagedCursorImpl cursor = (ManagedCursorImpl) cursors.get(consumerName);
         if (cursor == null) {
-            callback.deleteCursorFailed(new ManagedLedgerException("ManagedCursor not found: " + consumerName), ctx);
+            callback.deleteCursorFailed(new ManagedLedgerException.CursorNotFoundException("ManagedCursor not found: "
+                    + consumerName), ctx);
             return;
         } else if (!cursor.isDurable()) {
             cursors.removeCursor(consumerName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 5c91d52..2fb4944 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -60,6 +60,10 @@ public class MessageDeduplication {
     private ManagedCursor managedCursor;
 
     enum Status {
+
+        // Deduplication is initialized
+        Initialized,
+
         // Deduplication is disabled
         Disabled,
 
@@ -122,7 +126,7 @@ public class MessageDeduplication {
         this.pulsar = pulsar;
         this.topic = topic;
         this.managedLedger = managedLedger;
-        this.status = Status.Disabled;
+        this.status = Status.Initialized;
         this.snapshotInterval = pulsar.getConfiguration().getBrokerDeduplicationEntriesInterval();
         this.maxNumberOfProducers = pulsar.getConfiguration().getBrokerDeduplicationMaxNumberOfProducers();
         this.snapshotCounter = 0;
@@ -200,6 +204,25 @@ public class MessageDeduplication {
                     pulsar.getExecutor().schedule(this::checkStatus, 1, TimeUnit.MINUTES);
                     return CompletableFuture.completedFuture(null);
                 }
+                if (status == Status.Initialized && !shouldBeEnabled) {
+                    status = Status.Removing;
+                    managedLedger.asyncDeleteCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new DeleteCursorCallback() {
+                        @Override
+                        public void deleteCursorComplete(Object ctx) {
+                            status = Status.Disabled;
+                            log.info("[{}] Deleted deduplication cursor", topic.getName());
+                        }
+
+                        @Override
+                        public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
+                            if (exception instanceof ManagedLedgerException.CursorNotFoundException) {
+                                status = Status.Disabled;
+                            } else {
+                                log.error("[{}] Deleted deduplication cursor error", topic.getName(), exception);
+                            }
+                        }
+                    }, null);
+                }
 
                 if (status == Status.Enabled && !shouldBeEnabled) {
                     // Disabled deduping
@@ -220,15 +243,24 @@ public class MessageDeduplication {
 
                                 @Override
                                 public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
-                                    log.warn("[{}] Failed to disable deduplication: {}", topic.getName(),
-                                            exception.getMessage());
-                                    status = Status.Failed;
-                                    future.completeExceptionally(exception);
+                                    // It's ok for disable message deduplication.
+                                    if (exception instanceof ManagedLedgerException.CursorNotFoundException) {
+                                        status = Status.Disabled;
+                                        managedCursor = null;
+                                        highestSequencedPushed.clear();
+                                        highestSequencedPersisted.clear();
+                                        future.complete(null);
+                                    } else {
+                                        log.warn("[{}] Failed to disable deduplication: {}", topic.getName(),
+                                                exception.getMessage());
+                                        status = Status.Failed;
+                                        future.completeExceptionally(exception);
+                                    }
                                 }
                             }, null);
 
                     return future;
-                } else if (status == Status.Disabled && shouldBeEnabled) {
+                } else if ((status == Status.Disabled || status == Status.Initialized) && shouldBeEnabled) {
                     // Enable deduping
                     CompletableFuture<Void> future = new CompletableFuture<>();
                     managedLedger.asyncOpenCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new OpenCursorCallback() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index dd1176a..94a02f6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -22,7 +22,6 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMo
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.matches;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -78,7 +77,6 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.ServerCnx.State;
-import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
@@ -143,7 +141,6 @@ public class ServerCnxTest {
 
     private ManagedLedger ledgerMock = mock(ManagedLedger.class);
     private ManagedCursor cursorMock = mock(ManagedCursor.class);
-
     private OrderedExecutor executor;
 
     @BeforeMethod