You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/06 08:09:04 UTC

[pulsar] branch branch-2.7 updated (d1aa759ede4 -> 6f74686e8f2)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from d1aa759ede4 [ServerCnx] Improve error logging for topic not found (#13950) (#14893)
     new 1604093da49 Exclude inner classes in surefire (#9875)
     new 5f5b45e1735 Clean up individually deleted messages before the mark-delete position (#14261)
     new 2bdfe881f3d If mark-delete operation fails, mark the cursor as "dirty" (#14256)
     new 6f74686e8f2 [ML] Fix race condition in updating lastMarkDeleteEntry field (#15031)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 49 ++++++++++--
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 86 ++++++++++++++++++++--
 pom.xml                                            |  2 +-
 3 files changed, 124 insertions(+), 13 deletions(-)


[pulsar] 04/04: [ML] Fix race condition in updating lastMarkDeleteEntry field (#15031)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6f74686e8f27ba4ef56ff05e8f3fdc83b3447426
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Apr 5 22:44:25 2022 +0300

    [ML] Fix race condition in updating lastMarkDeleteEntry field (#15031)
    
    - missed updates can lead to the subscription and consuming getting stuck
    
    (cherry picked from commit ad2f397fb9bb1d6f90a980e68f0161ed32900309)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 31 ++++++++++++++++++----
 1 file changed, 26 insertions(+), 5 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index cb4847ba679..87fe0b94c26 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1652,7 +1652,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         // Apply rate limiting to mark-delete operations
         if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
             isDirty = true;
-            lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, properties, null, null);
+            updateLastMarkDeleteEntryToLatest(newPosition, properties);
             callback.markDeleteComplete(ctx);
             return;
         }
@@ -1705,7 +1705,14 @@ public class ManagedCursorImpl implements ManagedCursor {
         // ledger is postponed to when the counter goes to 0.
         PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.incrementAndGet(this);
 
-        lastMarkDeleteEntry = mdEntry;
+        LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
+            if (last != null && last.newPosition.compareTo(mdEntry.newPosition) > 0) {
+                // keep the current value since it's later then the mdEntry.newPosition
+                return last;
+            } else {
+                return mdEntry;
+            }
+        });
 
         persistPositionToLedger(cursorLedger, mdEntry, new VoidCallback() {
             @Override
@@ -1962,9 +1969,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         // Apply rate limiting to mark-delete operations
         if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
             isDirty = true;
-            PositionImpl finalNewMarkDeletePosition = newMarkDeletePosition;
-            LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this,
-                    last -> new MarkDeleteEntry(finalNewMarkDeletePosition, last.properties, null, null));
+            updateLastMarkDeleteEntryToLatest(newMarkDeletePosition, null);
             callback.deleteComplete(ctx);
             return;
         }
@@ -1996,6 +2001,22 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
     }
 
+    // update lastMarkDeleteEntry field if newPosition is later than the current lastMarkDeleteEntry.newPosition
+    private void updateLastMarkDeleteEntryToLatest(final PositionImpl newPosition,
+                                                   final Map<String, Long> properties) {
+        LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
+            if (last != null && last.newPosition.compareTo(newPosition) > 0) {
+                // keep current value, don't update
+                return last;
+            } else {
+                // use given properties or when missing, use the properties from the previous field value
+                Map<String, Long> propertiesToUse =
+                        properties != null ? properties : (last != null ? last.properties : Collections.emptyMap());
+                return new MarkDeleteEntry(newPosition, propertiesToUse, null, null);
+            }
+        });
+    }
+
     /**
      * Given a list of entries, filter out the entries that have already been individually deleted.
      *


[pulsar] 02/04: Clean up individually deleted messages before the mark-delete position (#14261)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5f5b45e1735d81dc182c36e66d43165c0251572d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Feb 13 21:22:38 2022 -0800

    Clean up individually deleted messages before the mark-delete position (#14261)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 17 ++++++++++-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 34 ++++++++++++++++++----
 2 files changed, 44 insertions(+), 7 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 2a908974606..f4c450883a6 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1517,7 +1517,9 @@ public class ManagedCursorImpl implements ManagedCursor {
      */
     PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
         if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) {
-            throw new IllegalArgumentException("Mark deleting an already mark-deleted position");
+            throw new IllegalArgumentException(
+                    "Mark deleting an already mark-deleted position. Current mark-delete: " + markDeletePosition
+                            + " -- attempted mark delete: " + newMarkDeletePosition);
         }
 
         PositionImpl oldMarkDeletePosition = markDeletePosition;
@@ -1914,6 +1916,19 @@ public class ManagedCursorImpl implements ManagedCursor {
             // mark-delete to the upper bound of the first range segment
             Range<PositionImpl> range = individualDeletedMessages.firstRange();
 
+            // If the upper bound is before the mark-delete position, we need to move ahead as these
+            // individualDeletedMessages are now irrelevant
+            if (range.upperEndpoint().compareTo(markDeletePosition) <= 0) {
+                individualDeletedMessages.removeAtMost(markDeletePosition.getLedgerId(),
+                        markDeletePosition.getEntryId());
+                range = individualDeletedMessages.firstRange();
+            }
+
+            if (range == null) {
+                // The set was completely cleaned up now
+                return;
+            }
+
             // If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
             if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
                     .getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 5a56026f0ef..b6c40bcfc69 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -29,11 +29,11 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBufAllocator;
 import java.lang.reflect.Field;
 import java.nio.charset.Charset;
 import java.time.Duration;
@@ -45,7 +45,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -58,8 +57,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
-
-import io.netty.buffer.ByteBufAllocator;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -85,8 +82,9 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet;
+import org.apache.pulsar.metadata.api.Stat;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.MockZooKeeper;
 import org.awaitility.Awaitility;
@@ -3417,7 +3415,6 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
                         ManagedCursor c2 = ledger2.openCursor("c");
 
                         assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
-
                     } finally {
                         factory2.shutdown();
                     }
@@ -3427,6 +3424,31 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         dirtyFactory.shutdown();
     }
 
+    @Test
+    public void testConsistencyOfIndividualMessages() throws Exception {
+        ManagedLedger ledger1 = factory.open("testConsistencyOfIndividualMessages");
+        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger1.openCursor("c");
+
+        PositionImpl p1 = (PositionImpl) ledger1.addEntry(new byte[1024]);
+        c1.markDelete(p1);
+
+        // Artificially add a position that is before the current mark-delete position
+        LongPairRangeSet<PositionImpl> idm = c1.getIndividuallyDeletedMessagesSet();
+        idm.addOpenClosed(p1.getLedgerId() - 1, 0, p1.getLedgerId() - 1, 10);
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 20; i++) {
+            positions.add(ledger1.addEntry(new byte[1024]));
+        }
+
+        for (int i = 0; i < 20; i++) {
+            c1.delete(positions.get(i));
+        }
+
+        assertEquals(c1.getTotalNonContiguousDeletedMessagesRange(), 0);
+        assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1));
+    }
+
     @Test
     public void testCursorGetBacklog() throws Exception {
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();


[pulsar] 01/04: Exclude inner classes in surefire (#9875)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1604093da499e75194428f7e95ea4cacd62608e4
Author: Ali Ahmed <al...@gmail.com>
AuthorDate: Wed Mar 10 21:05:31 2021 -0800

    Exclude inner classes in surefire (#9875)
    
    Co-authored-by: Ali Ahmed <al...@splunk.com>
    (cherry picked from commit 782372d8f51984f18de3b102cfb3c00c6dedb2bb)
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 49c43b799d1..84ef2379752 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1366,7 +1366,7 @@ flexible messaging model and an intuitive client API.</description>
                 <include>${include}</include>
             </includes>
             <excludes>
-                <exclude>${exclude}</exclude>
+                <exclude>**/*$*,${exclude}</exclude>
             </excludes>
             <groups>${groups}</groups>
             <excludedGroups>${excludedGroups}</excludedGroups>


[pulsar] 03/04: If mark-delete operation fails, mark the cursor as "dirty" (#14256)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2bdfe881f3d4156feca0098aa83ef7a5c914ed79
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Feb 13 03:05:52 2022 -0800

    If mark-delete operation fails, mark the cursor as "dirty" (#14256)
    
    (cherry picked from commit 8928c3496a61c588b50461d6adaab089dd421619)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  1 +
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 52 ++++++++++++++++++++++
 2 files changed, 53 insertions(+)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index f4c450883a6..cb4847ba679 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1750,6 +1750,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
             @Override
             public void operationFailed(ManagedLedgerException exception) {
+                isDirty = true;
                 log.warn("[{}] Failed to mark delete position for cursor={} position={}", ledger.getName(),
                         ManagedCursorImpl.this, mdEntry.newPosition);
                 if (log.isDebugEnabled()) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index b6c40bcfc69..d9f629d5ac4 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -3424,6 +3425,57 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         dirtyFactory.shutdown();
     }
 
+
+
+    @Test
+    public void testFlushCursorAfterError() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setThrottleMarkDelete(1.0);
+
+        ManagedLedgerFactoryConfig factoryConfig = new ManagedLedgerFactoryConfig();
+        factoryConfig.setCursorPositionFlushSeconds(1);
+
+        @Cleanup("shutdown")
+        ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle(), factoryConfig);
+        ManagedLedger ledger1 = factory1.open("testFlushCursorAfterInactivity", config);
+        ManagedCursor c1 = ledger1.openCursor("c");
+        List<Position> positions = new ArrayList<>();
+
+        for (int i = 0; i < 20; i++) {
+            positions.add(ledger1.addEntry(new byte[1024]));
+        }
+
+        // Simulate BK write error
+        bkc.failNow(BKException.Code.NotEnoughBookiesException);
+        zkc.setAlwaysFail(Code.BADVERSION);
+
+        try {
+            c1.markDelete(positions.get(positions.size() - 1));
+            fail("should have failed");
+        } catch (ManagedLedgerException e) {
+            // Expected
+        }
+
+        zkc.unsetAlwaysFail();
+
+        // In memory position is updated
+        assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1));
+
+        Awaitility.await()
+                // Give chance to the flush to be automatically triggered.
+                // NOTE: this can't be set too low, or it causes issues with ZK thread pool rejecting
+                .pollDelay(Duration.ofMillis(2000))
+                .untilAsserted(() -> {
+                    // Abruptly re-open the managed ledger without graceful close
+                    @Cleanup("shutdown")
+                    ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+                    ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
+                    ManagedCursor c2 = ledger2.openCursor("c");
+
+                    assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
+                });
+    }
+
     @Test
     public void testConsistencyOfIndividualMessages() throws Exception {
         ManagedLedger ledger1 = factory.open("testConsistencyOfIndividualMessages");