You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by aa...@apache.org on 2019/09/06 18:46:25 UTC

[pulsar] branch master updated: Trim deleted entries after recover cursor. (#4987)

This is an automated email from the ASF dual-hosted git repository.

aahmed pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dc7d01e  Trim deleted entries after recover cursor. (#4987)
dc7d01e is described below

commit dc7d01efc6cf2df5631bc509758f2212bede35ce
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Sep 7 02:46:18 2019 +0800

    Trim deleted entries after recover cursor. (#4987)
    
    * Trim deleted entries after recover cursor.
    
    * Fix errors
    
    * Add managed cursor unit tests.
    
    * Fix tests and handle cursor reset.
    
    * fix unit tests
    
    * Fix tests
    
    * Fix check style
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   | 14 +++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 13 ++++-
 .../mledger/impl/ManagedCursorContainerTest.java   | 11 ++++
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 59 ++++++++++++++++++++++
 .../apache/pulsar/broker/service/Dispatcher.java   |  4 ++
 .../PersistentDispatcherMultipleConsumers.java     | 25 +++++++++
 .../service/persistent/PersistentSubscription.java |  3 ++
 .../ConcurrentOpenLongPairRangeSet.java            | 14 +++++
 .../common/util/collections/LongPairRangeSet.java  | 19 +++++++
 .../ConcurrentOpenLongPairRangeSetTest.java        | 23 +++++++++
 10 files changed, 184 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 03a68ba..ae04269 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -20,9 +20,12 @@ package org.apache.bookkeeper.mledger;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Predicate;
+import com.google.common.collect.Range;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
@@ -30,6 +33,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 
 /**
  * A ManangedCursor is a persisted cursor inside a ManagedLedger.
@@ -594,4 +598,14 @@ public interface ManagedCursor {
      */
     ManagedLedger getManagedLedger();
 
+    /**
+     * Get last individual deleted range
+     * @return range
+     */
+    Range<PositionImpl> getLastIndividualDeletedRange();
+
+    /**
+     * Trim delete entries for the given entries
+     */
+    void trimDeletedEntries(List<Entry> entries);
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8a6f9a2..cc425ae 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1050,7 +1050,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             positions.stream()
                     .filter(position -> individualDeletedMessages.contains(((PositionImpl) position).getLedgerId(),
                             ((PositionImpl) position).getEntryId())
-                            || ((PositionImpl) position).compareTo(markDeletePosition) < 0)
+                            || ((PositionImpl) position).compareTo(markDeletePosition) <= 0)
                     .forEach(alreadyAcknowledgedPositions::add);
         } finally {
             lock.readLock().unlock();
@@ -2590,5 +2590,16 @@ public class ManagedCursorImpl implements ManagedCursor {
         return this.ledger;
     }
 
+    @Override
+    public Range<PositionImpl> getLastIndividualDeletedRange() {
+        return individualDeletedMessages.lastRange();
+    }
+
+    @Override
+    public void trimDeletedEntries(List<Entry> entries) {
+        entries.removeIf(entry -> ((PositionImpl) entry.getPosition()).compareTo(markDeletePosition) <= 0
+                || individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId()));
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index c415320..21f5747 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.fail;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import java.util.Collections;
 import java.util.List;
@@ -315,6 +316,16 @@ public class ManagedCursorContainerTest {
             return null;
         }
 
+        @Override
+        public Range<PositionImpl> getLastIndividualDeletedRange() {
+            return null;
+        }
+
+        @Override
+        public void trimDeletedEntries(List<Entry> entries) {
+
+        }
+
     }
 
     @Test
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 70db43c..b0db926 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -32,6 +32,7 @@ import static org.testng.Assert.fail;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.nio.charset.Charset;
@@ -53,6 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
+
+import io.netty.buffer.ByteBufAllocator;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -2226,6 +2229,62 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
     }
 
     @Test(timeOut = 20000)
+    void testGetLastIndividualDeletedRange() throws Exception {
+        ManagedLedger ledger = factory.open("test_last_individual_deleted");
+
+        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+        PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition();
+        for(int i = 0; i < 10; i++) {
+            ledger.addEntry(("entry" + i).getBytes(Encoding));
+        }
+        PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 1);
+        PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 2);
+        PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 5);
+        PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 6);
+
+        c1.delete(Lists.newArrayList(p1, p2, p3, p4));
+
+        assertEquals(c1.getLastIndividualDeletedRange(), Range.openClosed(PositionImpl.get(p3.getLedgerId(),
+                p3.getEntryId() - 1), p4));
+
+        PositionImpl p5 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 8);
+        c1.delete(p5);
+
+        assertEquals(c1.getLastIndividualDeletedRange(), Range.openClosed(PositionImpl.get(p5.getLedgerId(),
+                p5.getEntryId() - 1), p5));
+
+    }
+
+    @Test(timeOut = 20000)
+    void testTrimDeletedEntries() throws ManagedLedgerException, InterruptedException {
+        ManagedLedger ledger = factory.open("my_test_ledger");
+
+        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+        PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition();
+        for(int i = 0; i < 10; i++) {
+            ledger.addEntry(("entry" + i).getBytes(Encoding));
+        }
+        PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 1);
+        PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 2);
+        PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 5);
+        PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 6);
+
+        c1.delete(Lists.newArrayList(p1, p2, p3, p4));
+
+        EntryImpl entry1 = EntryImpl.create(p1, ByteBufAllocator.DEFAULT.buffer(0));
+        EntryImpl entry2 = EntryImpl.create(p2, ByteBufAllocator.DEFAULT.buffer(0));
+        EntryImpl entry3 = EntryImpl.create(p3, ByteBufAllocator.DEFAULT.buffer(0));
+        EntryImpl entry4 = EntryImpl.create(p4, ByteBufAllocator.DEFAULT.buffer(0));
+        EntryImpl entry5 = EntryImpl.create(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 7,
+                ByteBufAllocator.DEFAULT.buffer(0));
+        List<Entry> entries = Lists.newArrayList(entry1, entry2, entry3, entry4, entry5);
+        c1.trimDeletedEntries(entries);
+        assertEquals(entries.size(), 1);
+        assertEquals(entries.get(0).getPosition(), PositionImpl.get(markDeletedPosition.getLedgerId() ,
+                markDeletedPosition.getEntryId() + 7));
+    }
+
+    @Test(timeOut = 20000)
     void outOfOrderAcks() throws Exception {
         ManagedLedger ledger = factory.open("outOfOrderAcks");
         ManagedCursor c1 = ledger.openCursor("c1");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index cda9c09..5e6e72e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -94,4 +94,8 @@ public interface Dispatcher {
     default long getNumberOfDelayedMessages() {
         return 0;
     }
+
+    default void cursorIsReset() {
+        //No-op
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 4dff4c4..91e4ab7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -32,6 +32,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import com.google.common.collect.Range;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -72,6 +73,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
     protected final PersistentTopic topic;
     protected final ManagedCursor cursor;
+    protected volatile Range<PositionImpl> lastIndividualDeletedRangeFromCursorRecovery;
 
     private CompletableFuture<Void> closeFuture = null;
     LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
@@ -106,6 +108,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         super(subscription);
         this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
         this.cursor = cursor;
+        this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange();
         this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
         this.topic = topic;
         this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled()
@@ -431,6 +434,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     }
 
     protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+
+        if (entries == null || entries.size() == 0) {
+            return;
+        }
+        if (needTrimAckedMessages()) {
+            cursor.trimDeletedEntries(entries);
+        }
         int start = 0;
         int entriesToDispatch = entries.size();
         long totalMessagesSent = 0;
@@ -558,6 +568,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
     }
 
+    private boolean needTrimAckedMessages() {
+        if (lastIndividualDeletedRangeFromCursorRecovery == null) {
+            return false;
+        } else {
+            return lastIndividualDeletedRangeFromCursorRecovery.upperEndpoint()
+                    .compareTo((PositionImpl) cursor.getReadPosition()) > 0;
+        }
+    }
 
     /**
      * returns true only if {@link consumerList} has atleast one unblocked consumer and have available permits
@@ -726,6 +744,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         }
     }
 
+    @Override
+    public void cursorIsReset() {
+        if (this.lastIndividualDeletedRangeFromCursorRecovery != null) {
+            this.lastIndividualDeletedRangeFromCursorRecovery = null;
+        }
+    }
+
     public PersistentTopic getTopic() {
         return topic;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 20d64ed..4a4e3aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -651,6 +651,9 @@ public class PersistentSubscription implements Subscription {
                             log.debug("[{}][{}] Successfully reset subscription to position {}", topicName, subName,
                                     finalPosition);
                         }
+                        if (dispatcher != null) {
+                            dispatcher.cursorIsReset();
+                        }
                         IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
                         future.complete(null);
                     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index 3af8fb7..0d14119 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -222,6 +222,9 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
 
     @Override
     public Range<T> firstRange() {
+        if (rangeBitSetMap.isEmpty()) {
+            return null;
+        }
         Entry<Long, BitSet> firstSet = rangeBitSetMap.firstEntry();
         int lower = firstSet.getValue().nextSetBit(0);
         int upper = Math.max(lower, firstSet.getValue().nextClearBit(lower) - 1);
@@ -229,6 +232,17 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
     }
 
     @Override
+    public Range<T> lastRange() {
+        if (rangeBitSetMap.isEmpty()) {
+            return null;
+        }
+        Entry<Long, BitSet> lastSet = rangeBitSetMap.lastEntry();
+        int upper = lastSet.getValue().previousSetBit(lastSet.getValue().size());
+        int lower = Math.min(lastSet.getValue().previousClearBit(upper), upper);
+        return Range.openClosed(consumer.apply(lastSet.getKey(), lower), consumer.apply(lastSet.getKey(), upper));
+    }
+
+    @Override
     public int size() {
         if (updatedAfterCachedForSize) {
             AtomicInteger size = new AtomicInteger(0);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
index 2187ffe..0d19635 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
@@ -19,10 +19,13 @@
 package org.apache.pulsar.common.util.collections;
 
 import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeSet;
 import com.google.common.collect.TreeRangeSet;
+
 import java.util.Collection;
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -115,6 +118,13 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
     Range<T> firstRange();
 
     /**
+     * It returns very last biggest range in the rangeSet.
+     *
+     * @return last biggest range into the set
+     */
+    Range<T> lastRange();
+
+    /**
      * Represents a function that accepts two long arguments and produces a result.
      *
      * @param <T> the type of the result.
@@ -260,6 +270,15 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
         }
 
         @Override
+        public Range<T> lastRange() {
+            if (set.asRanges().isEmpty()) {
+                return null;
+            }
+            List<Range<T>> list = Lists.newArrayList(set.asRanges().iterator());
+            return list.get(list.size() - 1);
+        }
+
+        @Override
         public int size() {
             return set.asRanges().size();
         }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
index 210b7e6..4adf14f 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
@@ -197,6 +197,7 @@ public class ConcurrentOpenLongPairRangeSetTest {
     @Test
     public void testFirstRange() {
         ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+        assertNull(set.firstRange());
         Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99));
         set.add(range);
         assertEquals(set.firstRange(), range);
@@ -212,6 +213,28 @@ public class ConcurrentOpenLongPairRangeSetTest {
     }
 
     @Test
+    public void testLastRange() {
+        ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+        assertNull(set.lastRange());
+        Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99));
+        set.add(range);
+        assertEquals(set.lastRange(), range);
+        assertEquals(set.size(), 1);
+        range = Range.openClosed(new LongPair(0, 98), new LongPair(0, 105));
+        set.add(range);
+        assertEquals(set.lastRange(), Range.openClosed(new LongPair(0, 97), new LongPair(0, 105)));
+        assertEquals(set.size(), 1);
+        range = Range.openClosed(new LongPair(1, 5), new LongPair(1, 75));
+        set.add(range);
+        assertEquals(set.lastRange(), range);
+        assertEquals(set.size(), 2);
+        range = Range.openClosed(new LongPair(1, 80), new LongPair(1, 120));
+        set.add(range);
+        assertEquals(set.lastRange(), range);
+        assertEquals(set.size(), 3);
+    }
+
+    @Test
     public void testToString() {
         ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
         Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99));