You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2019/03/08 11:05:40 UTC

[pulsar] branch master updated: Set the dedup cursor as "inactive" after recovery (#3612)

This is an automated email from the ASF dual-hosted git repository.

massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a337e04  Set the dedup cursor as "inactive" after recovery (#3612)
a337e04 is described below

commit a337e0482e4f20a3b2b0ff21cf614741ce295777
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Mar 8 03:05:35 2019 -0800

    Set the dedup cursor as "inactive" after recovery (#3612)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  6 ++++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 12 +++++++++++-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 12 ++++++------
 .../mledger/impl/ManagedCursorContainerTest.java   |  4 ++++
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 15 +++++++++++++++
 .../broker/admin/impl/PersistentTopicsBase.java    |  6 ++++--
 .../service/persistent/CompactorSubscription.java  |  3 +++
 .../service/persistent/MessageDeduplication.java   | 22 +---------------------
 8 files changed, 50 insertions(+), 30 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index a1187b8..e1bcec7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -519,6 +519,12 @@ public interface ManagedCursor {
     void setInactive();
 
     /**
+     * A cursor that is set as always-inactive  will never trigger the caching of
+     * entries.
+     */
+    void setAlwaysInactive();
+
+    /**
      * Checks if cursor is active or not.
      *
      * @return
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 1d5fe67..86de809 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -132,6 +132,8 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     private RateLimiter markDeleteLimiter;
 
+    private boolean alwaysInactive = false;
+
     class MarkDeleteEntry {
         final PositionImpl newPosition;
         final MarkDeleteCallback callback;
@@ -785,7 +787,9 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     @Override
     public void setActive() {
-        ledger.activateCursor(this);
+        if (!alwaysInactive) {
+            ledger.activateCursor(this);
+        }
     }
 
     @Override
@@ -799,6 +803,12 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     @Override
+    public void setAlwaysInactive() {
+        setInactive();
+        this.alwaysInactive = true;
+    }
+
+    @Override
     public Position getFirstPosition() {
         Long firstLedgerId = ledger.getLedgersInfo().firstKey();
         return firstLedgerId == null ? null : new PositionImpl(firstLedgerId, 0);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index e1376f9..fa82bdf 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -169,7 +169,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     final Map<String, CompletableFuture<ManagedCursor>> uninitializedCursors;
 
     final EntryCache entryCache;
-    
+
     private ScheduledFuture<?> timeoutTask;
 
     /**
@@ -334,7 +334,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 }
             }
         });
-        
+
         scheduleTimeoutTask();
     }
 
@@ -1174,7 +1174,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
             closeAllCursors(callback, ctx);
         }, null);
-        
+
         if (this.timeoutTask != null) {
             this.timeoutTask.cancel(false);
         }
@@ -1698,7 +1698,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             // if the ctx-readOpCount is different than object's readOpCount means Object is already recycled and
             // assigned to different request
             boolean isRecycled = (ctx != null && ctx instanceof Integer) && (Integer) ctx != readOpCount;
-            // consider callback is completed if: Callback is already recycled or read-complete flag is true  
+            // consider callback is completed if: Callback is already recycled or read-complete flag is true
             return isRecycled || !READ_COMPLETED_UPDATER.compareAndSet(ReadEntryCallbackWrapper.this, FALSE, TRUE);
         }
 
@@ -3010,7 +3010,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     /**
      * Create ledger async and schedule a timeout task to check ledger-creation is complete else it fails the callback
      * with TimeoutException.
-     * 
+     *
      * @param bookKeeper
      * @param config
      * @param digestType
@@ -3038,7 +3038,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     /**
      * check if ledger-op task is already completed by timeout-task. If completed then delete the created ledger
-     * 
+     *
      * @param rc
      * @param lh
      * @param ctx
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 3e404d9..458ca4c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -223,6 +223,10 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
+        public void setAlwaysInactive() {
+        }
+
+        @Override
         public List<Entry> replayEntries(Set<? extends Position> positions)
                 throws InterruptedException, ManagedLedgerException {
             return null;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 5b64468..ef9bfb0 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2831,5 +2831,20 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         });
     }
 
+    @Test
+    void testAlwaysInactive() throws Exception {
+        ManagedLedger ml = factory.open("testAlwaysInactive");
+        ManagedCursor cursor = ml.openCursor("c1");
+
+        assertTrue(cursor.isActive());
+
+        cursor.setAlwaysInactive();
+
+        assertFalse(cursor.isActive());
+
+        cursor.setActive();
+        assertFalse(cursor.isActive());
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 2a117ec..2890b0d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -400,7 +400,7 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalCreateNonPartitionedTopic(boolean authoritative) {
     	validateAdminAccessForTenant(topicName.getTenant());
-    	
+
     	try {
     		getOrCreateTopic(topicName);
     		log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), topicName);
@@ -705,7 +705,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
         return stats;
     }
-    
+
     protected void internalDeleteSubscription(String subName, boolean authoritative) {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
@@ -967,6 +967,8 @@ public class PersistentTopicsBase extends AdminResource {
 
                 PersistentSubscription subscription = (PersistentSubscription) topic
                         .createSubscription(subscriptionName, InitialPosition.Latest).get();
+                // Mark the cursor as "inactive" as it was created without a real consumer connected
+                subscription.deactivateCursor();
                 subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
                 log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), topicName,
                         subscriptionName, messageId);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
index 6fe74bf..316ebc5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -45,6 +45,9 @@ public class CompactorSubscription extends PersistentSubscription {
         checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION));
         this.compactedTopic = compactedTopic;
 
+        // Avoid compactor cursor to cause entries to be cached
+        this.cursor.setAlwaysInactive();
+
         Map<String, Long> properties = cursor.getProperties();
         if (properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY)) {
             long compactedLedgerId = properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 443d29b..daf9646 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -166,26 +166,6 @@ public class MessageDeduplication {
         }, null);
     }
 
-    public CompletableFuture<Void> initialize() {
-        // Check whether the dedup cursor was already present
-        for (ManagedCursor cursor : managedLedger.getCursors()) {
-            if (cursor.getName().equals(PersistentTopic.DEDUPLICATION_CURSOR_NAME)) {
-                // Deduplication was enabled before
-                this.status = Status.Recovering;
-                this.managedCursor = cursor;
-                break;
-            }
-        }
-
-        if (status == Status.Recovering) {
-            // Recover the current cursor and then check the configuration
-            return recoverSequenceIdsMap().thenCompose(v -> checkStatus());
-        } else {
-            // No-op
-            return CompletableFuture.completedFuture(null);
-        }
-    }
-
     public Status getStatus() {
         return status;
     }
@@ -238,7 +218,7 @@ public class MessageDeduplication {
                         @Override
                         public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                             // We don't want to retain cache for this cursor
-                            cursor.setInactive();
+                            cursor.setAlwaysInactive();
                             managedCursor = cursor;
                             recoverSequenceIdsMap().thenRun(() -> {
                                 status = Status.Enabled;