You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/30 00:39:50 UTC

[incubator-pulsar] branch master updated: Added multiple position delete in ManagedLedger (#1450)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 73a214a  Added multiple position delete in ManagedLedger (#1450)
73a214a is described below

commit 73a214a59280d95b66c2a25ebe2fdfe898a7cd29
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Mar 29 17:39:48 2018 -0700

    Added multiple position delete in ManagedLedger (#1450)
    
    * Added multiple position delete in ManagedLedger
    
    * Removed ref to old bug and readded checkNotNull
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   | 37 ++++++++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 86 +++++++++++-----------
 .../mledger/impl/ManagedCursorContainerTest.java   |  8 ++
 .../mledger/impl/ManagedCursorListAckTest.java     | 75 +++++++++++++++++++
 4 files changed, 165 insertions(+), 41 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index db8e369..f6793b4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -265,6 +265,43 @@ public interface ManagedCursor {
      */
     void asyncDelete(Position position, DeleteCallback callback, Object ctx);
 
+
+    /**
+     * Delete a group of entries.
+     *
+     * <p/>
+     * Mark multiple single messages for deletion. When all the previous messages are all deleted, then markDelete()
+     * will be called internally to advance the persistent acknowledged position.
+     *
+     * <p/>
+     * The deletion of the message is not persisted into the durable storage and cannot be recovered upon the reopening
+     * of the ManagedLedger
+     *
+     * @param positions
+     *            positions of the messages to be deleted
+     */
+    void delete(Iterable<Position> positions) throws InterruptedException, ManagedLedgerException;
+
+    /**
+     * Delete a group of messages asynchronously
+     *
+     * <p/>
+     * Mark a group of messages for deletion. When all the previous messages are all deleted, then markDelete() will be
+     * called internally to advance the persistent acknowledged position.
+     *
+     * <p/>
+     * The deletion of the messages is not persisted into the durable storage and cannot be recovered upon the reopening
+     * of the ManagedLedger
+     *
+     * @param positions
+     *            the positions of the messages to be deleted
+     * @param callback
+     *            callback object
+     * @param ctx
+     *            opaque context
+     */
+    void asyncDelete(Iterable<Position> position, DeleteCallback callback, Object ctx);
+
     /**
      * Get the read position. This points to the next message to be read from the cursor.
      *
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c2fbbab..9b3d9cd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1447,8 +1447,17 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     @Override
     public void delete(final Position position) throws InterruptedException, ManagedLedgerException {
-        checkNotNull(position);
-        checkArgument(position instanceof PositionImpl);
+        delete(Collections.singletonList(position));
+    }
+
+    @Override
+    public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callback, Object ctx) {
+        asyncDelete(Collections.singletonList(pos), callback, ctx);
+    }
+
+    @Override
+    public void delete(Iterable<Position> positions) throws InterruptedException, ManagedLedgerException {
+        checkNotNull(positions);
 
         class Result {
             ManagedLedgerException exception = null;
@@ -1458,12 +1467,12 @@ public class ManagedCursorImpl implements ManagedCursor {
         final CountDownLatch counter = new CountDownLatch(1);
         final AtomicBoolean timeout = new AtomicBoolean(false);
 
-        asyncDelete(position, new AsyncCallbacks.DeleteCallback() {
+        asyncDelete(positions, new AsyncCallbacks.DeleteCallback() {
             @Override
             public void deleteComplete(Object ctx) {
                 if (timeout.get()) {
                     log.warn("[{}] [{}] Delete operation timeout. Callback deleteComplete at position {}",
-                            ledger.getName(), name, position);
+                            ledger.getName(), name, positions);
                 }
 
                 counter.countDown();
@@ -1475,7 +1484,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
                 if (timeout.get()) {
                     log.warn("[{}] [{}] Delete operation timeout. Callback deleteFailed at position {}",
-                            ledger.getName(), name, position);
+                            ledger.getName(), name, positions);
                 }
 
                 counter.countDown();
@@ -1485,7 +1494,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         if (!counter.await(ManagedLedgerImpl.AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
             timeout.set(true);
             log.warn("[{}] [{}] Delete operation timeout. No callback was triggered at position {}", ledger.getName(),
-                    name, position);
+                    name, positions);
             throw new ManagedLedgerException("Timeout during delete operation");
         }
 
@@ -1494,46 +1503,37 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
     }
 
-    @Override
-    public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callback, Object ctx) {
-        checkArgument(pos instanceof PositionImpl);
 
-        if (STATE_UPDATER.get(this) == State.Closed) {
+    @Override
+    public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallback callback, Object ctx) {
+        if (state == State.Closed) {
             callback.deleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
             return;
         }
 
-        PositionImpl position = (PositionImpl) pos;
-
-        PositionImpl previousPosition = ledger.getPreviousPosition(position);
         PositionImpl newMarkDeletePosition = null;
 
         lock.writeLock().lock();
 
         try {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] Deleting single message at {}. "
-                        + "Current status: {} - md-position: {}  - previous-position: {}",
-                        ledger.getName(), name, pos, individualDeletedMessages, markDeletePosition, previousPosition);
+                log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}",
+                        ledger.getName(), name, positions, individualDeletedMessages, markDeletePosition);
             }
 
-            if (individualDeletedMessages.contains(position) || position.compareTo(markDeletePosition) <= 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
-                }
-                callback.deleteComplete(ctx);
-                return;
-            }
+            for (Position pos : positions) {
+                PositionImpl position  = (PositionImpl) checkNotNull(pos);
 
-            if (previousPosition.compareTo(markDeletePosition) == 0 && individualDeletedMessages.isEmpty()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}][{}] Immediately mark-delete to position {}", ledger.getName(), name, position);
+                if (individualDeletedMessages.contains(position) || position.compareTo(markDeletePosition) <= 0) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
+                    }
+                    continue;
                 }
 
-                newMarkDeletePosition = position;
-            } else {
                 // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will make
                 // the RangeSet recognize the "continuity" between adjacent Positions
+                PositionImpl previousPosition = ledger.getPreviousPosition(position);
                 individualDeletedMessages.add(Range.openClosed(previousPosition, position));
                 ++messagesConsumedCounter;
 
@@ -1541,24 +1541,28 @@ public class ManagedCursorImpl implements ManagedCursor {
                     log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
                             individualDeletedMessages);
                 }
+            }
 
-                // If the lower bound of the range set is the current mark delete position, then we can trigger a new
-                // mark
-                // delete to the upper bound of the first range segment
-                Range<PositionImpl> range = individualDeletedMessages.asRanges().iterator().next();
+            if (individualDeletedMessages.isEmpty()) {
+                // No changes to individually deleted messages, so nothing to do at this point
+                callback.deleteComplete(ctx);
+                return;
+            }
 
-                // Bug:7062188 - markDeletePosition can sometimes be stuck at the beginning of an empty ledger.
-                // If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
-                if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
-                        .getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
+            // If the lower bound of the range set is the current mark delete position, then we can trigger a new
+            // mark-delete to the upper bound of the first range segment
+            Range<PositionImpl> range = individualDeletedMessages.asRanges().iterator().next();
 
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Found a position range to mark delete for cursor {}: {} ", ledger.getName(),
-                                name, range);
-                    }
+            // If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
+            if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
+                    .getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
 
-                    newMarkDeletePosition = range.upperEndpoint();
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Found a position range to mark delete for cursor {}: {} ", ledger.getName(),
+                            name, range);
                 }
+
+                newMarkDeletePosition = range.upperEndpoint();
             }
 
             if (newMarkDeletePosition != null) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 5ad25f5..de98b60 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -156,6 +156,14 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
+        public void delete(Iterable<Position> positions) throws InterruptedException, ManagedLedgerException {
+        }
+
+        @Override
+        public void asyncDelete(Iterable<Position> position, DeleteCallback callback, Object ctx) {
+        }
+
+        @Override
         public void clearBacklog() throws InterruptedException, ManagedLedgerException {
         }
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
new file mode 100644
index 0000000..aaf7cd1
--- /dev/null
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
+import java.nio.charset.Charset;
+
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.testng.annotations.Test;
+
+public class ManagedCursorListAckTest extends MockedBookKeeperTestCase {
+
+    private static final Charset Encoding = Charsets.UTF_8;
+
+    @Test(timeOut = 20000)
+    void testMultiPositionDelete() throws Exception {
+        ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
+
+        ManagedCursor c1 = ledger.openCursor("c1");
+        Position p0 = c1.getMarkDeletedPosition();
+        Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+        Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
+        Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
+        Position p5 = ledger.addEntry("dummy-entry-5".getBytes(Encoding));
+        Position p6 = ledger.addEntry("dummy-entry-6".getBytes(Encoding));
+        Position p7 = ledger.addEntry("dummy-entry-7".getBytes(Encoding));
+
+        assertEquals(c1.getNumberOfEntries(), 7);
+        assertEquals(c1.getNumberOfEntriesInBacklog(), 7);
+
+        c1.delete(Lists.newArrayList(p2, p3, p5, p7));
+
+        assertEquals(c1.getNumberOfEntries(), 3);
+        assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+        assertEquals(c1.getMarkDeletedPosition(), p0);
+
+        c1.delete(Lists.newArrayList(p1));
+
+        assertEquals(c1.getNumberOfEntries(), 2);
+        assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
+        assertEquals(c1.getMarkDeletedPosition(), p3);
+
+        c1.delete(Lists.newArrayList(p4, p6, p7));
+
+        assertEquals(c1.getNumberOfEntries(), 0);
+        assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
+        assertEquals(c1.getMarkDeletedPosition(), p7);
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.