You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2019/03/08 11:05:40 UTC
[pulsar] branch master updated: Set the dedup cursor as "inactive"
after recovery (#3612)
This is an automated email from the ASF dual-hosted git repository.
massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a337e04 Set the dedup cursor as "inactive" after recovery (#3612)
a337e04 is described below
commit a337e0482e4f20a3b2b0ff21cf614741ce295777
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Mar 8 03:05:35 2019 -0800
Set the dedup cursor as "inactive" after recovery (#3612)
---
.../apache/bookkeeper/mledger/ManagedCursor.java | 6 ++++++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 12 +++++++++++-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 12 ++++++------
.../mledger/impl/ManagedCursorContainerTest.java | 4 ++++
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 15 +++++++++++++++
.../broker/admin/impl/PersistentTopicsBase.java | 6 ++++--
.../service/persistent/CompactorSubscription.java | 3 +++
.../service/persistent/MessageDeduplication.java | 22 +---------------------
8 files changed, 50 insertions(+), 30 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index a1187b8..e1bcec7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -519,6 +519,12 @@ public interface ManagedCursor {
void setInactive();
/**
+ * A cursor that is set as always-inactive will never trigger the caching of
+ * entries.
+ */
+ void setAlwaysInactive();
+
+ /**
* Checks if cursor is active or not.
*
* @return
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 1d5fe67..86de809 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -132,6 +132,8 @@ public class ManagedCursorImpl implements ManagedCursor {
private RateLimiter markDeleteLimiter;
+ private boolean alwaysInactive = false;
+
class MarkDeleteEntry {
final PositionImpl newPosition;
final MarkDeleteCallback callback;
@@ -785,7 +787,9 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public void setActive() {
- ledger.activateCursor(this);
+ if (!alwaysInactive) {
+ ledger.activateCursor(this);
+ }
}
@Override
@@ -799,6 +803,12 @@ public class ManagedCursorImpl implements ManagedCursor {
}
@Override
+ public void setAlwaysInactive() {
+ setInactive();
+ this.alwaysInactive = true;
+ }
+
+ @Override
public Position getFirstPosition() {
Long firstLedgerId = ledger.getLedgersInfo().firstKey();
return firstLedgerId == null ? null : new PositionImpl(firstLedgerId, 0);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index e1376f9..fa82bdf 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -169,7 +169,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
final Map<String, CompletableFuture<ManagedCursor>> uninitializedCursors;
final EntryCache entryCache;
-
+
private ScheduledFuture<?> timeoutTask;
/**
@@ -334,7 +334,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
}
});
-
+
scheduleTimeoutTask();
}
@@ -1174,7 +1174,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
closeAllCursors(callback, ctx);
}, null);
-
+
if (this.timeoutTask != null) {
this.timeoutTask.cancel(false);
}
@@ -1698,7 +1698,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// if the ctx-readOpCount is different than object's readOpCount means Object is already recycled and
// assigned to different request
boolean isRecycled = (ctx != null && ctx instanceof Integer) && (Integer) ctx != readOpCount;
- // consider callback is completed if: Callback is already recycled or read-complete flag is true
+ // consider callback is completed if: Callback is already recycled or read-complete flag is true
return isRecycled || !READ_COMPLETED_UPDATER.compareAndSet(ReadEntryCallbackWrapper.this, FALSE, TRUE);
}
@@ -3010,7 +3010,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
/**
* Create ledger async and schedule a timeout task to check ledger-creation is complete else it fails the callback
* with TimeoutException.
- *
+ *
* @param bookKeeper
* @param config
* @param digestType
@@ -3038,7 +3038,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
/**
* check if ledger-op task is already completed by timeout-task. If completed then delete the created ledger
- *
+ *
* @param rc
* @param lh
* @param ctx
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 3e404d9..458ca4c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -223,6 +223,10 @@ public class ManagedCursorContainerTest {
}
@Override
+ public void setAlwaysInactive() {
+ }
+
+ @Override
public List<Entry> replayEntries(Set<? extends Position> positions)
throws InterruptedException, ManagedLedgerException {
return null;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 5b64468..ef9bfb0 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2831,5 +2831,20 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
});
}
+ @Test
+ void testAlwaysInactive() throws Exception {
+ ManagedLedger ml = factory.open("testAlwaysInactive");
+ ManagedCursor cursor = ml.openCursor("c1");
+
+ assertTrue(cursor.isActive());
+
+ cursor.setAlwaysInactive();
+
+ assertFalse(cursor.isActive());
+
+ cursor.setActive();
+ assertFalse(cursor.isActive());
+ }
+
private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 2a117ec..2890b0d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -400,7 +400,7 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalCreateNonPartitionedTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
-
+
try {
getOrCreateTopic(topicName);
log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), topicName);
@@ -705,7 +705,7 @@ public class PersistentTopicsBase extends AdminResource {
}
return stats;
}
-
+
protected void internalDeleteSubscription(String subName, boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
@@ -967,6 +967,8 @@ public class PersistentTopicsBase extends AdminResource {
PersistentSubscription subscription = (PersistentSubscription) topic
.createSubscription(subscriptionName, InitialPosition.Latest).get();
+ // Mark the cursor as "inactive" as it was created without a real consumer connected
+ subscription.deactivateCursor();
subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), topicName,
subscriptionName, messageId);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
index 6fe74bf..316ebc5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -45,6 +45,9 @@ public class CompactorSubscription extends PersistentSubscription {
checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION));
this.compactedTopic = compactedTopic;
+ // Avoid compactor cursor to cause entries to be cached
+ this.cursor.setAlwaysInactive();
+
Map<String, Long> properties = cursor.getProperties();
if (properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY)) {
long compactedLedgerId = properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 443d29b..daf9646 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -166,26 +166,6 @@ public class MessageDeduplication {
}, null);
}
- public CompletableFuture<Void> initialize() {
- // Check whether the dedup cursor was already present
- for (ManagedCursor cursor : managedLedger.getCursors()) {
- if (cursor.getName().equals(PersistentTopic.DEDUPLICATION_CURSOR_NAME)) {
- // Deduplication was enabled before
- this.status = Status.Recovering;
- this.managedCursor = cursor;
- break;
- }
- }
-
- if (status == Status.Recovering) {
- // Recover the current cursor and then check the configuration
- return recoverSequenceIdsMap().thenCompose(v -> checkStatus());
- } else {
- // No-op
- return CompletableFuture.completedFuture(null);
- }
- }
-
public Status getStatus() {
return status;
}
@@ -238,7 +218,7 @@ public class MessageDeduplication {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
// We don't want to retain cache for this cursor
- cursor.setInactive();
+ cursor.setAlwaysInactive();
managedCursor = cursor;
recoverSequenceIdsMap().thenRun(() -> {
status = Status.Enabled;