You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/30 00:39:50 UTC
[incubator-pulsar] branch master updated: Added multiple position
delete in ManagedLedger (#1450)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 73a214a Added multiple position delete in ManagedLedger (#1450)
73a214a is described below
commit 73a214a59280d95b66c2a25ebe2fdfe898a7cd29
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Mar 29 17:39:48 2018 -0700
Added multiple position delete in ManagedLedger (#1450)
* Added multiple position delete in ManagedLedger
* Removed ref to old bug and readded checkNotNull
---
.../apache/bookkeeper/mledger/ManagedCursor.java | 37 ++++++++++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 86 +++++++++++-----------
.../mledger/impl/ManagedCursorContainerTest.java | 8 ++
.../mledger/impl/ManagedCursorListAckTest.java | 75 +++++++++++++++++++
4 files changed, 165 insertions(+), 41 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index db8e369..f6793b4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -265,6 +265,43 @@ public interface ManagedCursor {
*/
void asyncDelete(Position position, DeleteCallback callback, Object ctx);
+
+ /**
+ * Delete a group of entries.
+ *
+ * <p/>
+ * Mark multiple single messages for deletion. When all the previous messages are all deleted, then markDelete()
+ * will be called internally to advance the persistent acknowledged position.
+ *
+ * <p/>
+ * The deletion of the message is not persisted into the durable storage and cannot be recovered upon the reopening
+ * of the ManagedLedger
+ *
+ * @param positions
+ * positions of the messages to be deleted
+ */
+ void delete(Iterable<Position> positions) throws InterruptedException, ManagedLedgerException;
+
+ /**
+ * Delete a group of messages asynchronously
+ *
+ * <p/>
+ * Mark a group of messages for deletion. When all the previous messages are all deleted, then markDelete() will be
+ * called internally to advance the persistent acknowledged position.
+ *
+ * <p/>
+ * The deletion of the messages is not persisted into the durable storage and cannot be recovered upon the reopening
+ * of the ManagedLedger
+ *
+ * @param positions
+ * the positions of the messages to be deleted
+ * @param callback
+ * callback object
+ * @param ctx
+ * opaque context
+ */
+ void asyncDelete(Iterable<Position> position, DeleteCallback callback, Object ctx);
+
/**
* Get the read position. This points to the next message to be read from the cursor.
*
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c2fbbab..9b3d9cd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1447,8 +1447,17 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public void delete(final Position position) throws InterruptedException, ManagedLedgerException {
- checkNotNull(position);
- checkArgument(position instanceof PositionImpl);
+ delete(Collections.singletonList(position));
+ }
+
+ @Override
+ public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callback, Object ctx) {
+ asyncDelete(Collections.singletonList(pos), callback, ctx);
+ }
+
+ @Override
+ public void delete(Iterable<Position> positions) throws InterruptedException, ManagedLedgerException {
+ checkNotNull(positions);
class Result {
ManagedLedgerException exception = null;
@@ -1458,12 +1467,12 @@ public class ManagedCursorImpl implements ManagedCursor {
final CountDownLatch counter = new CountDownLatch(1);
final AtomicBoolean timeout = new AtomicBoolean(false);
- asyncDelete(position, new AsyncCallbacks.DeleteCallback() {
+ asyncDelete(positions, new AsyncCallbacks.DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
if (timeout.get()) {
log.warn("[{}] [{}] Delete operation timeout. Callback deleteComplete at position {}",
- ledger.getName(), name, position);
+ ledger.getName(), name, positions);
}
counter.countDown();
@@ -1475,7 +1484,7 @@ public class ManagedCursorImpl implements ManagedCursor {
if (timeout.get()) {
log.warn("[{}] [{}] Delete operation timeout. Callback deleteFailed at position {}",
- ledger.getName(), name, position);
+ ledger.getName(), name, positions);
}
counter.countDown();
@@ -1485,7 +1494,7 @@ public class ManagedCursorImpl implements ManagedCursor {
if (!counter.await(ManagedLedgerImpl.AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
timeout.set(true);
log.warn("[{}] [{}] Delete operation timeout. No callback was triggered at position {}", ledger.getName(),
- name, position);
+ name, positions);
throw new ManagedLedgerException("Timeout during delete operation");
}
@@ -1494,46 +1503,37 @@ public class ManagedCursorImpl implements ManagedCursor {
}
}
- @Override
- public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callback, Object ctx) {
- checkArgument(pos instanceof PositionImpl);
- if (STATE_UPDATER.get(this) == State.Closed) {
+ @Override
+ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallback callback, Object ctx) {
+ if (state == State.Closed) {
callback.deleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
return;
}
- PositionImpl position = (PositionImpl) pos;
-
- PositionImpl previousPosition = ledger.getPreviousPosition(position);
PositionImpl newMarkDeletePosition = null;
lock.writeLock().lock();
try {
if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Deleting single message at {}. "
- + "Current status: {} - md-position: {} - previous-position: {}",
- ledger.getName(), name, pos, individualDeletedMessages, markDeletePosition, previousPosition);
+ log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}",
+ ledger.getName(), name, positions, individualDeletedMessages, markDeletePosition);
}
- if (individualDeletedMessages.contains(position) || position.compareTo(markDeletePosition) <= 0) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
- }
- callback.deleteComplete(ctx);
- return;
- }
+ for (Position pos : positions) {
+ PositionImpl position = (PositionImpl) checkNotNull(pos);
- if (previousPosition.compareTo(markDeletePosition) == 0 && individualDeletedMessages.isEmpty()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}][{}] Immediately mark-delete to position {}", ledger.getName(), name, position);
+ if (individualDeletedMessages.contains(position) || position.compareTo(markDeletePosition) <= 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
+ }
+ continue;
}
- newMarkDeletePosition = position;
- } else {
// Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will make
// the RangeSet recognize the "continuity" between adjacent Positions
+ PositionImpl previousPosition = ledger.getPreviousPosition(position);
individualDeletedMessages.add(Range.openClosed(previousPosition, position));
++messagesConsumedCounter;
@@ -1541,24 +1541,28 @@ public class ManagedCursorImpl implements ManagedCursor {
log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
individualDeletedMessages);
}
+ }
- // If the lower bound of the range set is the current mark delete position, then we can trigger a new
- // mark
- // delete to the upper bound of the first range segment
- Range<PositionImpl> range = individualDeletedMessages.asRanges().iterator().next();
+ if (individualDeletedMessages.isEmpty()) {
+ // No changes to individually deleted messages, so nothing to do at this point
+ callback.deleteComplete(ctx);
+ return;
+ }
- // Bug:7062188 - markDeletePosition can sometimes be stuck at the beginning of an empty ledger.
- // If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
- if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
- .getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
+ // If the lower bound of the range set is the current mark delete position, then we can trigger a new
+ // mark-delete to the upper bound of the first range segment
+ Range<PositionImpl> range = individualDeletedMessages.asRanges().iterator().next();
- if (log.isDebugEnabled()) {
- log.debug("[{}] Found a position range to mark delete for cursor {}: {} ", ledger.getName(),
- name, range);
- }
+ // If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
+ if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
+ .getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
- newMarkDeletePosition = range.upperEndpoint();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Found a position range to mark delete for cursor {}: {} ", ledger.getName(),
+ name, range);
}
+
+ newMarkDeletePosition = range.upperEndpoint();
}
if (newMarkDeletePosition != null) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 5ad25f5..de98b60 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -156,6 +156,14 @@ public class ManagedCursorContainerTest {
}
@Override
+ public void delete(Iterable<Position> positions) throws InterruptedException, ManagedLedgerException {
+ }
+
+ @Override
+ public void asyncDelete(Iterable<Position> position, DeleteCallback callback, Object ctx) {
+ }
+
+ @Override
public void clearBacklog() throws InterruptedException, ManagedLedgerException {
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
new file mode 100644
index 0000000..aaf7cd1
--- /dev/null
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
+import java.nio.charset.Charset;
+
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.testng.annotations.Test;
+
+public class ManagedCursorListAckTest extends MockedBookKeeperTestCase {
+
+ private static final Charset Encoding = Charsets.UTF_8;
+
+ @Test(timeOut = 20000)
+ void testMultiPositionDelete() throws Exception {
+ ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
+
+ ManagedCursor c1 = ledger.openCursor("c1");
+ Position p0 = c1.getMarkDeletedPosition();
+ Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+ Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+ Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
+ Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
+ Position p5 = ledger.addEntry("dummy-entry-5".getBytes(Encoding));
+ Position p6 = ledger.addEntry("dummy-entry-6".getBytes(Encoding));
+ Position p7 = ledger.addEntry("dummy-entry-7".getBytes(Encoding));
+
+ assertEquals(c1.getNumberOfEntries(), 7);
+ assertEquals(c1.getNumberOfEntriesInBacklog(), 7);
+
+ c1.delete(Lists.newArrayList(p2, p3, p5, p7));
+
+ assertEquals(c1.getNumberOfEntries(), 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c1.getMarkDeletedPosition(), p0);
+
+ c1.delete(Lists.newArrayList(p1));
+
+ assertEquals(c1.getNumberOfEntries(), 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(c1.getMarkDeletedPosition(), p3);
+
+ c1.delete(Lists.newArrayList(p4, p6, p7));
+
+ assertEquals(c1.getNumberOfEntries(), 0);
+ assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c1.getMarkDeletedPosition(), p7);
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.