You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/03/27 20:28:17 UTC

(pulsar) branch branch-3.2 updated (21e56955d5e -> f56c383a531)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 21e56955d5e [fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (#22252)
     new cc79803cb7e [fix][misc] Make ConcurrentBitSet thread safe (#22361)
     new f56c383a531 [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../persistent/PersistentMessageExpiryMonitor.java |   4 +-
 .../service/PersistentMessageFinderTest.java       |  51 ++-
 .../common/util/collections/ConcurrentBitSet.java  | 363 +++++++++++++++++++--
 3 files changed, 378 insertions(+), 40 deletions(-)


(pulsar) 01/02: [fix][misc] Make ConcurrentBitSet thread safe (#22361)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cc79803cb7e4caf732e37e1879ae49f3dfbbd290
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Mar 27 09:16:22 2024 -0700

    [fix][misc] Make ConcurrentBitSet thread safe (#22361)
    
    (cherry picked from commit edd0076bd83f01a5fcbe81c8396667014f0fc36e)
---
 .../common/util/collections/ConcurrentBitSet.java  | 363 +++++++++++++++++++--
 1 file changed, 331 insertions(+), 32 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
index 23842fe5b55..a37628cb300 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
@@ -20,12 +20,13 @@ package org.apache.pulsar.common.util.collections;
 
 import java.util.BitSet;
 import java.util.concurrent.locks.StampedLock;
-import lombok.EqualsAndHashCode;
+import java.util.stream.IntStream;
 
 /**
- * Safe multithreaded version of {@code BitSet}.
+ * A {@code BitSet} that is protected by a {@code StampedLock} to provide thread-safe access.
+ * The {@link #length()} method is not thread safe and is not overridden because StampedLock is not reentrant.
+ * Use the {@link #safeLength()} method to get the length of the bit set in a thread-safe manner.
  */
-@EqualsAndHashCode(callSuper = true)
 public class ConcurrentBitSet extends BitSet {
 
     private static final long serialVersionUID = 1L;
@@ -39,10 +40,8 @@ public class ConcurrentBitSet extends BitSet {
      * Creates a bit set whose initial size is large enough to explicitly represent bits with indices in the range
      * {@code 0} through {@code nbits-1}. All bits are initially {@code false}.
      *
-     * @param nbits
-     *            the initial size of the bit set
-     * @throws NegativeArraySizeException
-     *             if the specified initial size is negative
+     * @param nbits the initial size of the bit set
+     * @throws NegativeArraySizeException if the specified initial size is negative
      */
     public ConcurrentBitSet(int nbits) {
         super(nbits);
@@ -65,105 +64,405 @@ public class ConcurrentBitSet extends BitSet {
 
     @Override
     public void set(int bitIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.set(bitIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void clear(int bitIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.clear(bitIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void set(int fromIndex, int toIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.set(fromIndex, toIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void clear(int fromIndex, int toIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.clear(fromIndex, toIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void clear() {
+        long stamp = rwLock.writeLock();
+        try {
+            super.clear();
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public int nextSetBit(int fromIndex) {
         long stamp = rwLock.tryOptimisticRead();
-        super.set(bitIndex);
+        int nextSetBit = super.nextSetBit(fromIndex);
         if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                super.set(bitIndex);
+                nextSetBit = super.nextSetBit(fromIndex);
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
+        return nextSetBit;
     }
 
     @Override
-    public void set(int fromIndex, int toIndex) {
+    public int nextClearBit(int fromIndex) {
         long stamp = rwLock.tryOptimisticRead();
-        super.set(fromIndex, toIndex);
+        int nextClearBit = super.nextClearBit(fromIndex);
         if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                super.set(fromIndex, toIndex);
+                nextClearBit = super.nextClearBit(fromIndex);
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
+        return nextClearBit;
     }
 
     @Override
-    public int nextSetBit(int fromIndex) {
+    public int previousSetBit(int fromIndex) {
         long stamp = rwLock.tryOptimisticRead();
-        int bit = super.nextSetBit(fromIndex);
+        int previousSetBit = super.previousSetBit(fromIndex);
         if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                bit = super.nextSetBit(fromIndex);
+                previousSetBit = super.previousSetBit(fromIndex);
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
-        return bit;
+        return previousSetBit;
     }
 
     @Override
-    public int nextClearBit(int fromIndex) {
+    public int previousClearBit(int fromIndex) {
         long stamp = rwLock.tryOptimisticRead();
-        int bit = super.nextClearBit(fromIndex);
+        int previousClearBit = super.previousClearBit(fromIndex);
         if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                bit = super.nextClearBit(fromIndex);
+                previousClearBit = super.previousClearBit(fromIndex);
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
-        return bit;
+        return previousClearBit;
     }
 
     @Override
-    public int previousSetBit(int fromIndex) {
+    public boolean isEmpty() {
         long stamp = rwLock.tryOptimisticRead();
-        int bit = super.previousSetBit(fromIndex);
+        boolean isEmpty = super.isEmpty();
         if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                bit = super.previousSetBit(fromIndex);
+                isEmpty = super.isEmpty();
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
-        return bit;
+        return isEmpty;
     }
 
     @Override
-    public int previousClearBit(int fromIndex) {
+    public int cardinality() {
         long stamp = rwLock.tryOptimisticRead();
-        int bit = super.previousClearBit(fromIndex);
+        int cardinality = super.cardinality();
         if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                bit = super.previousClearBit(fromIndex);
+                cardinality = super.cardinality();
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
-        return bit;
+        return cardinality;
     }
 
     @Override
-    public boolean isEmpty() {
+    public int size() {
         long stamp = rwLock.tryOptimisticRead();
-        boolean isEmpty = super.isEmpty();
+        int size = super.size();
         if (!rwLock.validate(stamp)) {
             // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                isEmpty = super.isEmpty();
+                size = super.size();
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
-        return isEmpty;
+        return size;
+    }
+
+    @Override
+    public byte[] toByteArray() {
+        long stamp = rwLock.tryOptimisticRead();
+        byte[] byteArray = super.toByteArray();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                byteArray = super.toByteArray();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return byteArray;
+    }
+
+    @Override
+    public long[] toLongArray() {
+        long stamp = rwLock.tryOptimisticRead();
+        long[] longArray = super.toLongArray();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                longArray = super.toLongArray();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return longArray;
+    }
+
+    @Override
+    public void flip(int bitIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.flip(bitIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void flip(int fromIndex, int toIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.flip(fromIndex, toIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void set(int bitIndex, boolean value) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.set(bitIndex, value);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void set(int fromIndex, int toIndex, boolean value) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.set(fromIndex, toIndex, value);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public BitSet get(int fromIndex, int toIndex) {
+        long stamp = rwLock.tryOptimisticRead();
+        BitSet bitSet = super.get(fromIndex, toIndex);
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                bitSet = super.get(fromIndex, toIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return bitSet;
+    }
+
+    /**
+     * Thread-safe version of {@code length()}.
+     * StampedLock is not reentrant and that's why the length() method is not overridden. Overriding length() method
+     * would require to use a reentrant lock which would be less performant.
+     *
+     * @return length of the bit set
+     */
+    public int safeLength() {
+        long stamp = rwLock.tryOptimisticRead();
+        int length = super.length();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                length = super.length();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return length;
+    }
+
+    @Override
+    public boolean intersects(BitSet set) {
+        long stamp = rwLock.writeLock();
+        try {
+            return super.intersects(set);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void and(BitSet set) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.and(set);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void or(BitSet set) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.or(set);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void xor(BitSet set) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.xor(set);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void andNot(BitSet set) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.andNot(set);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    /**
+     * Returns the clone of the internal wrapped {@code BitSet}.
+     * This won't be a clone of the {@code ConcurrentBitSet} object.
+     *
+     * @return a clone of the internal wrapped {@code BitSet}
+     */
+    @Override
+    public Object clone() {
+        long stamp = rwLock.tryOptimisticRead();
+        BitSet clonedBitSet = (BitSet) super.clone();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                clonedBitSet = (BitSet) super.clone();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return clonedBitSet;
+    }
+
+    @Override
+    public String toString() {
+        long stamp = rwLock.tryOptimisticRead();
+        String str = super.toString();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                str = super.toString();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return str;
+    }
+
+    /**
+     * This operation is not supported on {@code ConcurrentBitSet}.
+     */
+    @Override
+    public IntStream stream() {
+        throw new UnsupportedOperationException("stream is not supported");
+    }
+
+    public boolean equals(final Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof ConcurrentBitSet)) {
+            return false;
+        }
+        long stamp = rwLock.tryOptimisticRead();
+        boolean isEqual = super.equals(o);
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                isEqual = super.equals(o);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return isEqual;
+    }
+
+    public int hashCode() {
+        long stamp = rwLock.tryOptimisticRead();
+        int hashCode = super.hashCode();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                hashCode = super.hashCode();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return hashCode;
     }
 }


(pulsar) 02/02: [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f56c383a5312ef9186880662860b8d2f38b3c0c8
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Thu Mar 28 03:42:15 2024 +0800

    [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335)
    
    (cherry picked from commit f77fe5f099f7ecc334509db07bba477c4226cf19)
---
 .../persistent/PersistentMessageExpiryMonitor.java |  4 +-
 .../service/PersistentMessageFinderTest.java       | 51 +++++++++++++++++++---
 2 files changed, 47 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index ac391c10503..2478a7a2538 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -121,8 +121,8 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag
                             managedLedger.getLedgersInfo().lastKey(), true);
             MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null;
             for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgerInfoSortedMap.values()) {
-                if (!ledgerInfo.hasTimestamp() || !MessageImpl.isEntryExpired(messageTTLInSeconds,
-                        ledgerInfo.getTimestamp())) {
+                if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() == 0L
+                        || !MessageImpl.isEntryExpired(messageTTLInSeconds, ledgerInfo.getTimestamp())) {
                     break;
                 }
                 info = ledgerInfo;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index ace552a55a7..6883c0467e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -33,10 +33,8 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
-
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -46,7 +44,9 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
+import java.util.concurrent.atomic.AtomicReference;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -72,11 +73,10 @@ import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.core.MediaType;
-
 @Test(groups = "broker")
 public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
 
@@ -463,6 +463,45 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
         assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
     }
 
+    @Test
+    public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() throws Throwable {
+        final String ledgerAndCursorName = "testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger";
+        int maxTTLSeconds = 1;
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(5);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config);
+        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
+        // set client clock to 10 days later
+        long incorrectPublishTimestamp = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(10);
+        for (int i = 0; i < 7; i++) {
+            ledger.addEntry(createMessageWrittenToLedger("msg" + i, incorrectPublishTimestamp));
+        }
+        assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+        PersistentTopic mock = mock(PersistentTopic.class);
+        when(mock.getName()).thenReturn("topicname");
+        when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
+        PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
+        AsyncCallbacks.MarkDeleteCallback markDeleteCallback =
+                (AsyncCallbacks.MarkDeleteCallback) spy(
+                        FieldUtils.readDeclaredField(monitor, "markDeleteCallback", true));
+        FieldUtils.writeField(monitor, "markDeleteCallback", markDeleteCallback, true);
+
+        AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>();
+        Mockito.doAnswer(invocation -> {
+            ManagedLedgerException argument = invocation.getArgument(0, ManagedLedgerException.class);
+            throwableAtomicReference.set(argument);
+            return invocation.callRealMethod();
+        }).when(markDeleteCallback).markDeleteFailed(any(), any());
+
+        PositionImpl position = (PositionImpl) ledger.getLastConfirmedEntry();
+        c1.markDelete(position);
+        Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds));
+        monitor.expireMessages(maxTTLSeconds);
+        assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
+
+        Assert.assertNull(throwableAtomicReference.get());
+    }
+
     @Test
     void testMessageExpiryWithPosition() throws Exception {
         final String ledgerAndCursorName = "testPersistentMessageExpiryWithPositionNonRecoverableLedgers";