You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/30 02:59:34 UTC
[pulsar] branch branch-2.6 updated: Fix deduplication cursor does
not delete after disabling message deduplication (#7656)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 0a19dec Fix deduplication cursor does not delete after disabling message deduplication (#7656)
0a19dec is described below
commit 0a19dec4e84f153b2b0e98f978e4bba798e0c37a
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jul 30 09:06:26 2020 +0800
Fix deduplication cursor does not delete after disabling message deduplication (#7656)
### Motivation
Fix deduplication cursor does not delete after disabling message deduplication. The issue occurs when enabling the message deduplication at the broker.conf and then disable it and restart the broker. The dedup cursor will not be deleted.
(cherry picked from commit 2bba620fb47afa6a4dceed36e332059792266313)
---
.../bookkeeper/mledger/ManagedLedgerException.java | 6 +++
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 +-
.../service/persistent/MessageDeduplication.java | 44 +++++++++++++++++++---
.../pulsar/broker/service/ServerCnxTest.java | 3 --
4 files changed, 46 insertions(+), 10 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 6e6e6a2..0b2cd4e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -149,6 +149,12 @@ public class ManagedLedgerException extends Exception {
}
}
+ public static class CursorNotFoundException extends ManagedLedgerException {
+ public CursorNotFoundException(String msg) {
+ super(msg);
+ }
+ }
+
@Override
public synchronized Throwable fillInStackTrace() {
// Disable stack traces to be filled in
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 810b3f8..eaf06fc 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -774,7 +774,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
final Object ctx) {
final ManagedCursorImpl cursor = (ManagedCursorImpl) cursors.get(consumerName);
if (cursor == null) {
- callback.deleteCursorFailed(new ManagedLedgerException("ManagedCursor not found: " + consumerName), ctx);
+ callback.deleteCursorFailed(new ManagedLedgerException.CursorNotFoundException("ManagedCursor not found: "
+ + consumerName), ctx);
return;
} else if (!cursor.isDurable()) {
cursors.removeCursor(consumerName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 5c91d52..2fb4944 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -60,6 +60,10 @@ public class MessageDeduplication {
private ManagedCursor managedCursor;
enum Status {
+
+ // Deduplication is initialized
+ Initialized,
+
// Deduplication is disabled
Disabled,
@@ -122,7 +126,7 @@ public class MessageDeduplication {
this.pulsar = pulsar;
this.topic = topic;
this.managedLedger = managedLedger;
- this.status = Status.Disabled;
+ this.status = Status.Initialized;
this.snapshotInterval = pulsar.getConfiguration().getBrokerDeduplicationEntriesInterval();
this.maxNumberOfProducers = pulsar.getConfiguration().getBrokerDeduplicationMaxNumberOfProducers();
this.snapshotCounter = 0;
@@ -200,6 +204,25 @@ public class MessageDeduplication {
pulsar.getExecutor().schedule(this::checkStatus, 1, TimeUnit.MINUTES);
return CompletableFuture.completedFuture(null);
}
+ if (status == Status.Initialized && !shouldBeEnabled) {
+ status = Status.Removing;
+ managedLedger.asyncDeleteCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new DeleteCursorCallback() {
+ @Override
+ public void deleteCursorComplete(Object ctx) {
+ status = Status.Disabled;
+ log.info("[{}] Deleted deduplication cursor", topic.getName());
+ }
+
+ @Override
+ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
+ if (exception instanceof ManagedLedgerException.CursorNotFoundException) {
+ status = Status.Disabled;
+ } else {
+ log.error("[{}] Deleted deduplication cursor error", topic.getName(), exception);
+ }
+ }
+ }, null);
+ }
if (status == Status.Enabled && !shouldBeEnabled) {
// Disabled deduping
@@ -220,15 +243,24 @@ public class MessageDeduplication {
@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
- log.warn("[{}] Failed to disable deduplication: {}", topic.getName(),
- exception.getMessage());
- status = Status.Failed;
- future.completeExceptionally(exception);
+ // It's ok for disable message deduplication.
+ if (exception instanceof ManagedLedgerException.CursorNotFoundException) {
+ status = Status.Disabled;
+ managedCursor = null;
+ highestSequencedPushed.clear();
+ highestSequencedPersisted.clear();
+ future.complete(null);
+ } else {
+ log.warn("[{}] Failed to disable deduplication: {}", topic.getName(),
+ exception.getMessage());
+ status = Status.Failed;
+ future.completeExceptionally(exception);
+ }
}
}, null);
return future;
- } else if (status == Status.Disabled && shouldBeEnabled) {
+ } else if ((status == Status.Disabled || status == Status.Initialized) && shouldBeEnabled) {
// Enable deduping
CompletableFuture<Void> future = new CompletableFuture<>();
managedLedger.asyncOpenCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new OpenCursorCallback() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index dd1176a..94a02f6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -22,7 +22,6 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMo
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.any;
import static org.mockito.Mockito.matches;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -78,7 +77,6 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.ServerCnx.State;
-import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
@@ -143,7 +141,6 @@ public class ServerCnxTest {
private ManagedLedger ledgerMock = mock(ManagedLedger.class);
private ManagedCursor cursorMock = mock(ManagedCursor.class);
-
private OrderedExecutor executor;
@BeforeMethod