You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by aa...@apache.org on 2019/09/06 18:46:25 UTC
[pulsar] branch master updated: Trim deleted entries after recover
cursor. (#4987)
This is an automated email from the ASF dual-hosted git repository.
aahmed pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dc7d01e Trim deleted entries after recover cursor. (#4987)
dc7d01e is described below
commit dc7d01efc6cf2df5631bc509758f2212bede35ce
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Sep 7 02:46:18 2019 +0800
Trim deleted entries after recover cursor. (#4987)
* Trim deleted entries after recover cursor.
* Fix errors
* Add managed cursor unit tests.
* Fix tests and handle cursor reset.
* fix unit tests
* Fix tests
* Fix check style
---
.../apache/bookkeeper/mledger/ManagedCursor.java | 14 +++++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 13 ++++-
.../mledger/impl/ManagedCursorContainerTest.java | 11 ++++
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 59 ++++++++++++++++++++++
.../apache/pulsar/broker/service/Dispatcher.java | 4 ++
.../PersistentDispatcherMultipleConsumers.java | 25 +++++++++
.../service/persistent/PersistentSubscription.java | 3 ++
.../ConcurrentOpenLongPairRangeSet.java | 14 +++++
.../common/util/collections/LongPairRangeSet.java | 19 +++++++
.../ConcurrentOpenLongPairRangeSetTest.java | 23 +++++++++
10 files changed, 184 insertions(+), 1 deletion(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 03a68ba..ae04269 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -20,9 +20,12 @@ package org.apache.bookkeeper.mledger;
import com.google.common.annotations.Beta;
import com.google.common.base.Predicate;
+import com.google.common.collect.Range;
+
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
@@ -30,6 +33,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
/**
* A ManangedCursor is a persisted cursor inside a ManagedLedger.
@@ -594,4 +598,14 @@ public interface ManagedCursor {
*/
ManagedLedger getManagedLedger();
+ /**
+ * Get last individual deleted range
+ * @return range
+ */
+ Range<PositionImpl> getLastIndividualDeletedRange();
+
+ /**
+ * Trim delete entries for the given entries
+ */
+ void trimDeletedEntries(List<Entry> entries);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8a6f9a2..cc425ae 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1050,7 +1050,7 @@ public class ManagedCursorImpl implements ManagedCursor {
positions.stream()
.filter(position -> individualDeletedMessages.contains(((PositionImpl) position).getLedgerId(),
((PositionImpl) position).getEntryId())
- || ((PositionImpl) position).compareTo(markDeletePosition) < 0)
+ || ((PositionImpl) position).compareTo(markDeletePosition) <= 0)
.forEach(alreadyAcknowledgedPositions::add);
} finally {
lock.readLock().unlock();
@@ -2590,5 +2590,16 @@ public class ManagedCursorImpl implements ManagedCursor {
return this.ledger;
}
+ @Override
+ public Range<PositionImpl> getLastIndividualDeletedRange() {
+ return individualDeletedMessages.lastRange();
+ }
+
+ @Override
+ public void trimDeletedEntries(List<Entry> entries) {
+ entries.removeIf(entry -> ((PositionImpl) entry.getPosition()).compareTo(markDeletePosition) <= 0
+ || individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId()));
+ }
+
private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index c415320..21f5747 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.fail;
import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.List;
@@ -315,6 +316,16 @@ public class ManagedCursorContainerTest {
return null;
}
+ @Override
+ public Range<PositionImpl> getLastIndividualDeletedRange() {
+ return null;
+ }
+
+ @Override
+ public void trimDeletedEntries(List<Entry> entries) {
+
+ }
+
}
@Test
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 70db43c..b0db926 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -32,6 +32,7 @@ import static org.testng.Assert.fail;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
@@ -53,6 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+
+import io.netty.buffer.ByteBufAllocator;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -2226,6 +2229,62 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
}
@Test(timeOut = 20000)
+ void testGetLastIndividualDeletedRange() throws Exception {
+ ManagedLedger ledger = factory.open("test_last_individual_deleted");
+
+ ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+ PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition();
+ for(int i = 0; i < 10; i++) {
+ ledger.addEntry(("entry" + i).getBytes(Encoding));
+ }
+ PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 1);
+ PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 2);
+ PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 5);
+ PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 6);
+
+ c1.delete(Lists.newArrayList(p1, p2, p3, p4));
+
+ assertEquals(c1.getLastIndividualDeletedRange(), Range.openClosed(PositionImpl.get(p3.getLedgerId(),
+ p3.getEntryId() - 1), p4));
+
+ PositionImpl p5 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 8);
+ c1.delete(p5);
+
+ assertEquals(c1.getLastIndividualDeletedRange(), Range.openClosed(PositionImpl.get(p5.getLedgerId(),
+ p5.getEntryId() - 1), p5));
+
+ }
+
+ @Test(timeOut = 20000)
+ void testTrimDeletedEntries() throws ManagedLedgerException, InterruptedException {
+ ManagedLedger ledger = factory.open("my_test_ledger");
+
+ ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+ PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition();
+ for(int i = 0; i < 10; i++) {
+ ledger.addEntry(("entry" + i).getBytes(Encoding));
+ }
+ PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 1);
+ PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 2);
+ PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 5);
+ PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 6);
+
+ c1.delete(Lists.newArrayList(p1, p2, p3, p4));
+
+ EntryImpl entry1 = EntryImpl.create(p1, ByteBufAllocator.DEFAULT.buffer(0));
+ EntryImpl entry2 = EntryImpl.create(p2, ByteBufAllocator.DEFAULT.buffer(0));
+ EntryImpl entry3 = EntryImpl.create(p3, ByteBufAllocator.DEFAULT.buffer(0));
+ EntryImpl entry4 = EntryImpl.create(p4, ByteBufAllocator.DEFAULT.buffer(0));
+ EntryImpl entry5 = EntryImpl.create(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 7,
+ ByteBufAllocator.DEFAULT.buffer(0));
+ List<Entry> entries = Lists.newArrayList(entry1, entry2, entry3, entry4, entry5);
+ c1.trimDeletedEntries(entries);
+ assertEquals(entries.size(), 1);
+ assertEquals(entries.get(0).getPosition(), PositionImpl.get(markDeletedPosition.getLedgerId() ,
+ markDeletedPosition.getEntryId() + 7));
+ }
+
+ @Test(timeOut = 20000)
void outOfOrderAcks() throws Exception {
ManagedLedger ledger = factory.open("outOfOrderAcks");
ManagedCursor c1 = ledger.openCursor("c1");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index cda9c09..5e6e72e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -94,4 +94,8 @@ public interface Dispatcher {
default long getNumberOfDelayedMessages() {
return 0;
}
+
+ default void cursorIsReset() {
+ //No-op
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 4dff4c4..91e4ab7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -32,6 +32,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import com.google.common.collect.Range;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -72,6 +73,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected final PersistentTopic topic;
protected final ManagedCursor cursor;
+ protected volatile Range<PositionImpl> lastIndividualDeletedRangeFromCursorRecovery;
private CompletableFuture<Void> closeFuture = null;
LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
@@ -106,6 +108,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
super(subscription);
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.cursor = cursor;
+ this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange();
this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
this.topic = topic;
this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled()
@@ -431,6 +434,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+
+ if (entries == null || entries.size() == 0) {
+ return;
+ }
+ if (needTrimAckedMessages()) {
+ cursor.trimDeletedEntries(entries);
+ }
int start = 0;
int entriesToDispatch = entries.size();
long totalMessagesSent = 0;
@@ -558,6 +568,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
+ private boolean needTrimAckedMessages() {
+ if (lastIndividualDeletedRangeFromCursorRecovery == null) {
+ return false;
+ } else {
+ return lastIndividualDeletedRangeFromCursorRecovery.upperEndpoint()
+ .compareTo((PositionImpl) cursor.getReadPosition()) > 0;
+ }
+ }
/**
* returns true only if {@link consumerList} has atleast one unblocked consumer and have available permits
@@ -726,6 +744,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
}
+ @Override
+ public void cursorIsReset() {
+ if (this.lastIndividualDeletedRangeFromCursorRecovery != null) {
+ this.lastIndividualDeletedRangeFromCursorRecovery = null;
+ }
+ }
+
public PersistentTopic getTopic() {
return topic;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 20d64ed..4a4e3aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -651,6 +651,9 @@ public class PersistentSubscription implements Subscription {
log.debug("[{}][{}] Successfully reset subscription to position {}", topicName, subName,
finalPosition);
}
+ if (dispatcher != null) {
+ dispatcher.cursorIsReset();
+ }
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
future.complete(null);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index 3af8fb7..0d14119 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -222,6 +222,9 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
@Override
public Range<T> firstRange() {
+ if (rangeBitSetMap.isEmpty()) {
+ return null;
+ }
Entry<Long, BitSet> firstSet = rangeBitSetMap.firstEntry();
int lower = firstSet.getValue().nextSetBit(0);
int upper = Math.max(lower, firstSet.getValue().nextClearBit(lower) - 1);
@@ -229,6 +232,17 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
}
@Override
+ public Range<T> lastRange() {
+ if (rangeBitSetMap.isEmpty()) {
+ return null;
+ }
+ Entry<Long, BitSet> lastSet = rangeBitSetMap.lastEntry();
+ int upper = lastSet.getValue().previousSetBit(lastSet.getValue().size());
+ int lower = Math.min(lastSet.getValue().previousClearBit(upper), upper);
+ return Range.openClosed(consumer.apply(lastSet.getKey(), lower), consumer.apply(lastSet.getKey(), upper));
+ }
+
+ @Override
public int size() {
if (updatedAfterCachedForSize) {
AtomicInteger size = new AtomicInteger(0);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
index 2187ffe..0d19635 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
@@ -19,10 +19,13 @@
package org.apache.pulsar.common.util.collections;
import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
+
import java.util.Collection;
+import java.util.List;
import java.util.Set;
/**
@@ -115,6 +118,13 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
Range<T> firstRange();
/**
+ * It returns very last biggest range in the rangeSet.
+ *
+ * @return last biggest range into the set
+ */
+ Range<T> lastRange();
+
+ /**
* Represents a function that accepts two long arguments and produces a result.
*
* @param <T> the type of the result.
@@ -260,6 +270,15 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
}
@Override
+ public Range<T> lastRange() {
+ if (set.asRanges().isEmpty()) {
+ return null;
+ }
+ List<Range<T>> list = Lists.newArrayList(set.asRanges().iterator());
+ return list.get(list.size() - 1);
+ }
+
+ @Override
public int size() {
return set.asRanges().size();
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
index 210b7e6..4adf14f 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
@@ -197,6 +197,7 @@ public class ConcurrentOpenLongPairRangeSetTest {
@Test
public void testFirstRange() {
ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+ assertNull(set.firstRange());
Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99));
set.add(range);
assertEquals(set.firstRange(), range);
@@ -212,6 +213,28 @@ public class ConcurrentOpenLongPairRangeSetTest {
}
@Test
+ public void testLastRange() {
+ ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+ assertNull(set.lastRange());
+ Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99));
+ set.add(range);
+ assertEquals(set.lastRange(), range);
+ assertEquals(set.size(), 1);
+ range = Range.openClosed(new LongPair(0, 98), new LongPair(0, 105));
+ set.add(range);
+ assertEquals(set.lastRange(), Range.openClosed(new LongPair(0, 97), new LongPair(0, 105)));
+ assertEquals(set.size(), 1);
+ range = Range.openClosed(new LongPair(1, 5), new LongPair(1, 75));
+ set.add(range);
+ assertEquals(set.lastRange(), range);
+ assertEquals(set.size(), 2);
+ range = Range.openClosed(new LongPair(1, 80), new LongPair(1, 120));
+ set.add(range);
+ assertEquals(set.lastRange(), range);
+ assertEquals(set.size(), 3);
+ }
+
+ @Test
public void testToString() {
ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99));