You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/07 05:04:30 UTC

[pulsar] branch branch-2.10 updated (f21f7d21513 -> d50201d4183)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from f21f7d21513 Fix NPE when TableView handles null value message (#15951)
     new eed7825d8ee [Transaction] Fix cursor readPosition is bigger than maxPosition in OpReadEntry (#14667)
     new f03b1686c95 [cleanup] [broker] Remove useless code to avoid confusion in OpReadEntry#checkReadCompletion. (#15104)
     new a5034d6ccd7 Use dispatchRateLimiterLock to update dispatchRateLimiter. (#15601)
     new fa78cf8f45b [optimize][txn] Optimize transaction lowWaterMark to clean useless data faster (#15592)
     new 1bcb7fbe360 [fix][broker-common] expose configurationMetadataStore and localMetadataStore (#15661)
     new 2e5c984b201 [fix][ML]Fix NPE when put value to `RangeCache`. (#15707)
     new d1db33ee114 [feat] [tiered-storage] Add pure S3 provider for the offloader (#15710)
     new 1e026944556 Sync topicPublishRateLimiter update (#15599)
     new fa862e30d69 fix bug in getNumberOfEntriesInStorage (#15627)
     new b4c704e1f6c Fix NPE in MessageDeduplication. (#15820)
     new 784c9c83be2 Fix avro conversion order of registration (#15863)
     new c13805e4ef1 Avoid contended synchronized block on topic load (#15883)
     new f8bc91f13fc [Revert] [#15483] Remove sensitive msg from consumer/producer stats log (#15817)
     new 5bf69dc1cd7 [fix][broker]Fast return if ack cumulative illegal (#15695)
     new 7a62dfad8d5 [improve][tiered storage] Reduce cpu usage when offloading the ledger (#15063)
     new 1a7a157df89 [fix][connector] KCA connectors: fix offset mapping when sanitizeTopicName=true (#15950)
     new d50201d4183 Fix cherry-pick issue

The 17 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   2 +-
 .../bookkeeper/mledger/impl/OpReadEntry.java       |   9 +-
 .../apache/bookkeeper/mledger/util/RangeCache.java |  12 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  43 ++-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  29 ++
 .../bookkeeper/mledger/util/RangeCacheTest.java    |  29 +-
 .../pulsar/broker/resources/PulsarResources.java   |   3 +-
 .../org/apache/pulsar/broker/PulsarService.java    |  40 +--
 .../pulsar/broker/service/AbstractTopic.java       |  41 +--
 .../org/apache/pulsar/broker/service/Consumer.java |   2 +
 .../service/persistent/MessageDeduplication.java   |   2 +-
 .../broker/service/persistent/PersistentTopic.java |   5 +-
 .../buffer/impl/TopicTransactionBuffer.java        |  72 +++--
 .../pendingack/impl/PendingAckHandleImpl.java      |  49 ++-
 .../broker/service/MessageCumulativeAckTest.java   | 199 +++++++++++++
 .../service/persistent/MessageDuplicationTest.java |   7 +
 .../buffer/TransactionLowWaterMarkTest.java        | 140 ++++++++-
 .../pendingack/PendingAckPersistentTest.java       |  20 +-
 .../client/impl/conf/ClientConfigurationData.java  |   7 -
 .../pulsar/client/impl/schema/AvroSchema.java      |   3 +-
 .../impl/conf/ClientConfigurationDataTest.java     |   1 -
 .../pulsar/client/impl/schema/AvroSchemaTest.java  |  21 ++
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  |   6 +-
 .../io/kafka/connect/KafkaConnectSinkTest.java     |   3 +
 .../impl/BlockAwareSegmentInputStreamImpl.java     | 116 +++++++-
 .../jcloud/provider/JCloudBlobStoreProvider.java   |  54 +++-
 .../provider/TieredStorageConfiguration.java       |  13 +
 .../impl/BlockAwareSegmentInputStreamTest.java     | 328 +++++++++++++++++++--
 .../provider/JCloudBlobStoreProviderTests.java     |  31 +-
 .../provider/TieredStorageConfigurationTests.java  |  17 ++
 30 files changed, 1125 insertions(+), 179 deletions(-)
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java


[pulsar] 02/17: [cleanup] [broker] Remove useless code to avoid confusion in OpReadEntry#checkReadCompletion. (#15104)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f03b1686c9534379e68a515df0591f78af8734b7
Author: 赵延 <ho...@apache.org>
AuthorDate: Mon Apr 11 12:34:36 2022 +0800

    [cleanup] [broker] Remove useless code to avoid confusion in OpReadEntry#checkReadCompletion. (#15104)
    
    (cherry picked from commit 93761284b9f6875da0403f5fedb6ccbfbbcd7315)
---
 .../main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java   | 6 +-----
 1 file changed, 1 insertion(+), 5 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 178ee2dedab..006accaf252 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -137,12 +137,8 @@ class OpReadEntry implements ReadEntriesCallback {
         // op readPosition is smaller or equals maxPosition then can read again
         if (entries.size() < count && cursor.hasMoreEntries()
                 && maxPosition.compareTo(readPosition) > 0) {
-            // We still have more entries to read from the next ledger, schedule a new async operation
-            if (nextReadPosition.getLedgerId() != readPosition.getLedgerId()) {
-                cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
-            }
 
-            // Schedule next read in a different thread
+            // We still have more entries to read from the next ledger, schedule a new async operation
             cursor.ledger.getExecutor().execute(safeRun(() -> {
                 readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
                 cursor.ledger.asyncReadEntries(OpReadEntry.this);


[pulsar] 12/17: Avoid contended synchronized block on topic load (#15883)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c13805e4ef12c200b5abfe3c76abf86fa827f6c0
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jun 2 08:34:55 2022 -0700

    Avoid contended synchronized block on topic load (#15883)
    
    (cherry picked from commit 7d2fdea7749d72b58def4045be3f295e0ee4f04d)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 40 ++++++++++++----------
 1 file changed, 21 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index e1ffbe757ad..0acc4ec6956 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1246,33 +1246,35 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         });
     }
 
-    public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies)
+    public LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies)
             throws PulsarServerException {
         try {
             if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) {
                 checkNotNull(offloadPolicies.getOffloadersDirectory(),
                     "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
                         offloadPolicies.getManagedLedgerOffloadDriver());
-                Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
-                        offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
-
-                LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
-                        offloadPolicies.getManagedLedgerOffloadDriver());
-                try {
-                    return offloaderFactory.create(
-                        offloadPolicies,
-                        ImmutableMap.of(
-                            LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
-                            LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha(),
-                            LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(), config.getClusterName()
-                        ),
-                        schemaStorage,
-                        getOffloaderScheduler(offloadPolicies));
-                } catch (IOException ioe) {
-                    throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
+                synchronized (this) {
+                    Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
+                            offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
+
+                    LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
+                            offloadPolicies.getManagedLedgerOffloadDriver());
+                    try {
+                        return offloaderFactory.create(
+                            offloadPolicies,
+                            ImmutableMap.of(
+                                LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
+                                LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha(),
+                                LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(), config.getClusterName()
+                            ),
+                            schemaStorage,
+                            getOffloaderScheduler(offloadPolicies));
+                    } catch (IOException ioe) {
+                        throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
+                    }
                 }
             } else {
-                LOG.info("No ledger offloader configured, using NULL instance");
+                LOG.debug("No ledger offloader configured, using NULL instance");
                 return NullLedgerOffloader.INSTANCE;
             }
         } catch (Throwable t) {


[pulsar] 05/17: [fix][broker-common] expose configurationMetadataStore and localMetadataStore (#15661)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1bcb7fbe3607092ad5e41891c54abc2b61389937
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Mon May 23 09:51:27 2022 +0800

    [fix][broker-common] expose configurationMetadataStore and localMetadataStore (#15661)
    
    (cherry picked from commit e7dff4b5136f55b62ea1fa79073c8b5236564a33)
---
 .../main/java/org/apache/pulsar/broker/resources/PulsarResources.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index e206d5ad542..a087d8090d3 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -47,8 +47,9 @@ public class PulsarResources {
     private final BookieResources bookieResources;
     @Getter
     private final TopicResources topicResources;
-
+    @Getter
     private final Optional<MetadataStore> localMetadataStore;
+    @Getter
     private final Optional<MetadataStore> configurationMetadataStore;
 
     public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore) {


[pulsar] 15/17: [improve][tiered storage] Reduce cpu usage when offloading the ledger (#15063)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7a62dfad8d5ee29043e72e06161fbae6dc414a63
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Mon Jun 6 16:31:14 2022 +0800

    [improve][tiered storage] Reduce cpu usage when offloading the ledger (#15063)
    
    * [imporve][tiered storage] Reduce cpu usage when offloading the ledger
    ---
    
    *Motivation*
    
    When offloading a ledger, the BlockAwareSegmentInputStreamImpl will
    wrap the ledger handler and make it can stream output. Then the JCloud
    will read the stream as the payload and upload to the storage.
    In the JCloud implementation, it read the stream with a buffer
    https://github.com/apache/jclouds/blob/36f351cd18925d2bb27bf7ad2c5d75e555da377a/core/src/main/java/org/jclouds/io/ByteStreams2.java#L68
    
    In the current offload implementation, the read will call multiple times
    to construct the buffer and then return the data.
    After implement the read(byte[] b, int off, int len), the cpu usage reduced
    almost 10%.
    
    *Modifications*
    
    - Add read(byte[] b, int off, int len) implementation in the BlockAwareSegmentInputStreamImpl
    
    (cherry picked from commit 938ab7befc57a23e5a2bcb0f8bfe5c714c4d0018)
---
 .../impl/BlockAwareSegmentInputStreamImpl.java     | 116 +++++++-
 .../impl/BlockAwareSegmentInputStreamTest.java     | 328 +++++++++++++++++++--
 2 files changed, 409 insertions(+), 35 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
index a4ffdea6509..e4f935c1113 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import static com.google.common.base.Preconditions.checkState;
 import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
 import java.io.IOException;
@@ -27,6 +28,7 @@ import java.io.InputStream;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
@@ -44,6 +46,9 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
     private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
 
     static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD };
+    static final byte[] BLOCK_END_PADDING_BYTES =  Ints.toByteArray(0xFEDCDEAD);
+
+    private final ByteBuf paddingBuf = PulsarByteBufAllocator.DEFAULT.buffer(128, 128);
 
     private final ReadHandle ledger;
     private final long startEntryId;
@@ -65,6 +70,8 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
     static final int ENTRY_HEADER_SIZE = 4 /* entry size */ + 8 /* entry id */;
     // Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content.
     private List<ByteBuf> entriesByteBuf = null;
+    private int currentOffset = 0;
+    private final AtomicBoolean close = new AtomicBoolean(false);
 
     public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) {
         this.ledger = ledger;
@@ -76,6 +83,52 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
         this.entriesByteBuf = Lists.newLinkedList();
     }
 
+    private ByteBuf readEntries(int len) throws IOException {
+        checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
+        checkState(bytesReadOffset < blockSize);
+
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
+            entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
+        }
+
+        if (!entriesByteBuf.isEmpty()
+            && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) {
+            // always read from the first ByteBuf in the list, once read all of its content remove it.
+            ByteBuf entryByteBuf = entriesByteBuf.get(0);
+            int readableBytes = entryByteBuf.readableBytes();
+            int read = Math.min(readableBytes, len);
+            ByteBuf buf = entryByteBuf.slice(currentOffset, read);
+            buf.retain();
+            currentOffset += read;
+            entryByteBuf.readerIndex(currentOffset);
+            bytesReadOffset += read;
+
+            if (entryByteBuf.readableBytes() == 0) {
+                entryByteBuf.release();
+                entriesByteBuf.remove(0);
+                blockEntryCount++;
+                currentOffset = 0;
+            }
+
+            return buf;
+        } else {
+            // no space for a new entry or there are no more entries
+            // set data block full, return end padding
+            if (dataBlockFullOffset == blockSize) {
+                dataBlockFullOffset = bytesReadOffset;
+            }
+            paddingBuf.clear();
+            for (int i = 0; i < Math.min(len, paddingBuf.capacity()); i++) {
+                paddingBuf.writeByte(BLOCK_END_PADDING_BYTES[(bytesReadOffset++ - dataBlockFullOffset)
+                    % BLOCK_END_PADDING_BYTES.length]);
+            }
+            return paddingBuf.retain();
+        }
+    }
+
     // read ledger entries.
     private int readEntries() throws IOException {
         checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
@@ -143,6 +196,46 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
         }
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException("The given bytes are null");
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length);
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int offset = off;
+        int readLen = len;
+        int readBytes = 0;
+        // reading header
+        if (dataBlockHeaderStream.available() > 0) {
+            int read = dataBlockHeaderStream.read(b, off, len);
+            offset += read;
+            readLen -= read;
+            readBytes += read;
+            bytesReadOffset += read;
+        }
+        if (readLen == 0) {
+            return readBytes;
+        }
+
+        // reading ledger entries
+        if (bytesReadOffset < blockSize) {
+            readLen = Math.min(readLen, blockSize - bytesReadOffset);
+            ByteBuf readEntries = readEntries(readLen);
+            int read = readEntries.readableBytes();
+            readEntries.readBytes(b, offset, read);
+            readEntries.release();
+            readBytes += read;
+            return readBytes;
+        }
+
+        // reached end
+        return -1;
+    }
+
     @Override
     public int read() throws IOException {
         // reading header
@@ -162,11 +255,20 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
 
     @Override
     public void close() throws IOException {
-        super.close();
-        dataBlockHeaderStream.close();
-        if (!entriesByteBuf.isEmpty()) {
-            entriesByteBuf.forEach(buf -> buf.release());
-            entriesByteBuf.clear();
+        // The close method will be triggered twice in the BlobStoreManagedLedgerOffloader#offload method.
+        // The stream resource used by the try-with block which will called the close
+        // And through debug, writeBlobStore.uploadMultipartPart in the offload method also will trigger
+        // the close method.
+        // So we add the close variable to avoid release paddingBuf twice.
+        if (!close.compareAndSet(false, true)) {
+            super.close();
+            dataBlockHeaderStream.close();
+            if (!entriesByteBuf.isEmpty()) {
+                entriesByteBuf.forEach(buf -> buf.release());
+                entriesByteBuf.clear();
+            }
+            paddingBuf.clear();
+            paddingBuf.release();
         }
     }
 
@@ -185,6 +287,10 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
         return blockSize;
     }
 
+    public int getDataBlockFullOffset() {
+        return dataBlockFullOffset;
+    }
+
     @Override
     public int getBlockEntryCount() {
         return blockEntryCount;
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
index 5cf6bd56500..0cd4bbd70a9 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.fail;
 import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
 
@@ -28,6 +29,7 @@ import com.google.common.primitives.Longs;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -44,6 +46,7 @@ import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
@@ -206,8 +209,16 @@ public class BlockAwareSegmentInputStreamTest {
         }
     }
 
-    @Test
-    public void testHaveEndPadding() throws Exception {
+    @DataProvider(name = "useBufferRead")
+    public static Object[][] useBufferRead() {
+        return new Object[][]{
+            {Boolean.TRUE},
+            {Boolean.FALSE}
+        };
+    }
+
+    @Test(dataProvider = "useBufferRead")
+    public void testHaveEndPadding(boolean useBufferRead) throws Exception {
         int ledgerId = 1;
         int entrySize = 8;
         int lac = 160;
@@ -226,7 +237,12 @@ public class BlockAwareSegmentInputStreamTest {
         // verify read inputStream
         // 1. read header. 128
         byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
-        ByteStreams.readFully(inputStream, headerB);
+        if (useBufferRead) {
+            int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset());
+            assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret);
+        } else {
+            ByteStreams.readFully(inputStream, headerB);
+        }
         DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB));
         assertEquals(headerRead.getBlockLength(), blockSize);
         assertEquals(headerRead.getFirstEntryId(), 0);
@@ -240,9 +256,18 @@ public class BlockAwareSegmentInputStreamTest {
                 byte lengthBuf[] = new byte[4];
                 byte entryIdBuf[] = new byte[8];
                 byte content[] = new byte[entrySize];
-                inputStream.read(lengthBuf);
-                inputStream.read(entryIdBuf);
-                inputStream.read(content);
+                if (useBufferRead) {
+                    int read = inputStream.read(lengthBuf, 0, 4);
+                    assertEquals(read, 4);
+                    read = inputStream.read(entryIdBuf, 0, 8);
+                    assertEquals(read, 8);
+                    read = inputStream.read(content, 0, entrySize);
+                    assertEquals(read, entrySize);
+                } else {
+                    inputStream.read(lengthBuf);
+                    inputStream.read(entryIdBuf);
+                    inputStream.read(content);
+                }
 
                 assertEquals(entrySize, Ints.fromByteArray(lengthBuf));
                 assertEquals(i, Longs.fromByteArray(entryIdBuf));
@@ -256,13 +281,36 @@ public class BlockAwareSegmentInputStreamTest {
         int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() -  expectedEntryCount * (entrySize + 4 + 8);
         assertEquals(left, 5);
         byte padding[] = new byte[left];
-        inputStream.read(padding);
+        if (useBufferRead) {
+            int ret = 0;
+            int offset = 0;
+            while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) {
+                offset += ret;
+            }
+            assertEquals(inputStream.read(padding, 0, padding.length), -1);
+        } else {
+            int len = left;
+            int offset = 0;
+            byte[] buf = new byte[4];
+            while (len > 0) {
+                int ret = inputStream.read(buf);
+                for (int i = 0; i < ret; i++) {
+                    padding[offset++] = buf[i];
+                }
+                len -= ret;
+            }
+        }
         ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
         IntStream.range(0, paddingBuf.capacity()/4).forEach(i ->
             assertEquals(Integer.toHexString(paddingBuf.readInt()),
                          Integer.toHexString(0xFEDCDEAD)));
 
         // 4. reach end.
+        if (useBufferRead) {
+            byte[] b = new byte[4];
+            int ret = inputStream.read(b, 0, 4);
+            assertEquals(ret, -1);
+        }
         assertEquals(inputStream.read(), -1);
 
         assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount);
@@ -272,8 +320,8 @@ public class BlockAwareSegmentInputStreamTest {
         inputStream.close();
     }
 
-    @Test
-    public void testNoEndPadding() throws Exception {
+    @Test(dataProvider = "useBufferRead")
+    public void testNoEndPadding(boolean useBufferRead) throws Exception {
         int ledgerId = 1;
         int entrySize = 8;
         int lac = 120;
@@ -293,7 +341,12 @@ public class BlockAwareSegmentInputStreamTest {
         // verify read inputStream
         // 1. read header. 128
         byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
-        ByteStreams.readFully(inputStream, headerB);
+        if (useBufferRead) {
+            int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset());
+            assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret);
+        } else {
+            ByteStreams.readFully(inputStream, headerB);
+        }
         DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB));
         assertEquals(headerRead.getBlockLength(), blockSize);
         assertEquals(headerRead.getFirstEntryId(), 0);
@@ -307,9 +360,18 @@ public class BlockAwareSegmentInputStreamTest {
                 byte lengthBuf[] = new byte[4];
                 byte entryIdBuf[] = new byte[8];
                 byte content[] = new byte[entrySize];
-                inputStream.read(lengthBuf);
-                inputStream.read(entryIdBuf);
-                inputStream.read(content);
+                if (useBufferRead) {
+                    int read = inputStream.read(lengthBuf, 0, 4);
+                    assertEquals(read, 4);
+                    read = inputStream.read(entryIdBuf, 0, 8);
+                    assertEquals(read, 8);
+                    read = inputStream.read(content, 0, entrySize);
+                    assertEquals(read, entrySize);
+                } else {
+                    inputStream.read(lengthBuf);
+                    inputStream.read(entryIdBuf);
+                    inputStream.read(content);
+                }
 
                 assertEquals(entrySize, Ints.fromByteArray(lengthBuf));
                 assertEquals(i, Longs.fromByteArray(entryIdBuf));
@@ -324,6 +386,11 @@ public class BlockAwareSegmentInputStreamTest {
         assertEquals(left, 0);
 
         // 4. reach end.
+        if (useBufferRead) {
+            byte[] b = new byte[4];
+            int ret = inputStream.read(b, 0, 4);
+            assertEquals(ret, -1);
+        }
         assertEquals(inputStream.read(), -1);
 
         assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount);
@@ -333,8 +400,8 @@ public class BlockAwareSegmentInputStreamTest {
         inputStream.close();
     }
 
-    @Test
-    public void testReadTillLac() throws Exception {
+    @Test(dataProvider = "useBufferRead")
+    public void testReadTillLac(boolean useBufferRead) throws Exception {
         // simulate last data block read.
         int ledgerId = 1;
         int entrySize = 8;
@@ -354,7 +421,12 @@ public class BlockAwareSegmentInputStreamTest {
         // verify read inputStream
         // 1. read header. 128
         byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
-        ByteStreams.readFully(inputStream, headerB);
+        if (useBufferRead) {
+            int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset());
+            assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret);
+        } else {
+            ByteStreams.readFully(inputStream, headerB);
+        }
         DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB));
         assertEquals(headerRead.getBlockLength(), blockSize);
         assertEquals(headerRead.getFirstEntryId(), 0);
@@ -368,9 +440,18 @@ public class BlockAwareSegmentInputStreamTest {
                 byte lengthBuf[] = new byte[4];
                 byte entryIdBuf[] = new byte[8];
                 byte content[] = new byte[entrySize];
-                inputStream.read(lengthBuf);
-                inputStream.read(entryIdBuf);
-                inputStream.read(content);
+                if (useBufferRead) {
+                    int read = inputStream.read(lengthBuf, 0, 4);
+                    assertEquals(read, 4);
+                    read = inputStream.read(entryIdBuf, 0, 8);
+                    assertEquals(read, 8);
+                    read = inputStream.read(content, 0, entrySize);
+                    assertEquals(read, entrySize);
+                } else {
+                    inputStream.read(lengthBuf);
+                    inputStream.read(entryIdBuf);
+                    inputStream.read(content);
+                }
 
                 assertEquals(entrySize, Ints.fromByteArray(lengthBuf));
                 assertEquals(i, Longs.fromByteArray(entryIdBuf));
@@ -385,6 +466,11 @@ public class BlockAwareSegmentInputStreamTest {
         assertEquals(left, 0);
 
         // 4. reach end.
+        if (useBufferRead) {
+            byte[] b = new byte[4];
+            int ret = inputStream.read(b, 0, 4);
+            assertEquals(ret, -1);
+        }
         assertEquals(inputStream.read(), -1);
 
         assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount);
@@ -394,8 +480,8 @@ public class BlockAwareSegmentInputStreamTest {
         inputStream.close();
     }
 
-    @Test
-    public void testNoEntryPutIn() throws Exception {
+    @Test(dataProvider = "useBufferRead")
+    public void testNoEntryPutIn(boolean useBufferRead) throws Exception {
         // simulate first entry size over the block size budget, it shouldn't be added.
         // 2 entries, each with bigger size than block size, so there should no entry added into block.
         int ledgerId = 1;
@@ -416,7 +502,12 @@ public class BlockAwareSegmentInputStreamTest {
         // verify read inputStream
         // 1. read header. 128
         byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
-        ByteStreams.readFully(inputStream, headerB);
+        if (useBufferRead) {
+            int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset());
+            assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret);
+        } else {
+            ByteStreams.readFully(inputStream, headerB);
+        }
         DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB));
         assertEquals(headerRead.getBlockLength(), blockSize);
         assertEquals(headerRead.getFirstEntryId(), 0);
@@ -424,13 +515,36 @@ public class BlockAwareSegmentInputStreamTest {
 
         // 2. since no entry put in, it should only get padding after header.
         byte padding[] = new byte[blockSize - DataBlockHeaderImpl.getDataStartOffset()];
-        inputStream.read(padding);
+        if (useBufferRead) {
+            int ret = 0;
+            int offset = 0;
+            while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) {
+                offset += ret;
+            }
+            assertEquals(inputStream.read(padding, 0, padding.length), -1);
+        } else {
+            int len = padding.length;
+            int offset = 0;
+            byte[] buf = new byte[4];
+            while (len > 0) {
+                int ret = inputStream.read(buf);
+                for (int i = 0; i < ret; i++) {
+                    padding[offset++] = buf[i];
+                }
+                len -= ret;
+            }
+        }
         ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
         IntStream.range(0, paddingBuf.capacity()/4).forEach(i ->
             assertEquals(Integer.toHexString(paddingBuf.readInt()),
                          Integer.toHexString(0xFEDCDEAD)));
 
         // 3. reach end.
+        if (useBufferRead) {
+            byte[] b = new byte[4];
+            int ret = inputStream.read(b, 0, 4);
+            assertEquals(ret, -1);
+        }
         assertEquals(inputStream.read(), -1);
 
         assertEquals(inputStream.getBlockEntryCount(), 0);
@@ -440,8 +554,8 @@ public class BlockAwareSegmentInputStreamTest {
         inputStream.close();
     }
 
-    @Test
-    public void testPaddingOnLastBlock() throws Exception {
+    @Test(dataProvider = "useBufferRead")
+    public void testPaddingOnLastBlock(boolean useBufferRead) throws Exception {
         int ledgerId = 1;
         int entrySize = 1000;
         int lac = 0;
@@ -460,7 +574,12 @@ public class BlockAwareSegmentInputStreamTest {
         // verify read inputStream
         // 1. read header. 128
         byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
-        ByteStreams.readFully(inputStream, headerB);
+        if (useBufferRead) {
+            int ret = inputStream.read(headerB, 0, DataBlockHeaderImpl.getDataStartOffset());
+            assertEquals(DataBlockHeaderImpl.getDataStartOffset(), ret);
+        } else {
+            ByteStreams.readFully(inputStream, headerB);
+        }
         DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB));
         assertEquals(headerRead.getBlockLength(), blockSize);
         assertEquals(headerRead.getFirstEntryId(), 0);
@@ -474,9 +593,18 @@ public class BlockAwareSegmentInputStreamTest {
                 byte lengthBuf[] = new byte[4];
                 byte entryIdBuf[] = new byte[8];
                 byte content[] = new byte[entrySize];
-                inputStream.read(lengthBuf);
-                inputStream.read(entryIdBuf);
-                inputStream.read(content);
+                if (useBufferRead) {
+                    int read = inputStream.read(lengthBuf, 0, 4);
+                    assertEquals(read, 4);
+                    read = inputStream.read(entryIdBuf, 0, 8);
+                    assertEquals(read, 8);
+                    read = inputStream.read(content, 0, entrySize);
+                    assertEquals(read, entrySize);
+                } else {
+                    inputStream.read(lengthBuf);
+                    inputStream.read(entryIdBuf);
+                    inputStream.read(content);
+                }
 
                 assertEquals(entrySize, Ints.fromByteArray(lengthBuf));
                 assertEquals(i, Longs.fromByteArray(entryIdBuf));
@@ -490,13 +618,36 @@ public class BlockAwareSegmentInputStreamTest {
         int consumedBytes = DataBlockHeaderImpl.getDataStartOffset()
             + expectedEntryCount * (entrySize + BlockAwareSegmentInputStreamImpl.ENTRY_HEADER_SIZE);
         byte padding[] = new byte[blockSize - consumedBytes];
-        inputStream.read(padding);
+        if (useBufferRead) {
+            int ret = 0;
+            int offset = 0;
+            while ((ret = inputStream.read(padding, offset, padding.length - offset)) > 0) {
+                offset += ret;
+            }
+            assertEquals(inputStream.read(padding, 0, padding.length), -1);
+        } else {
+            int len = blockSize - consumedBytes;
+            int offset = 0;
+            byte[] buf = new byte[4];
+            while (len > 0) {
+                int ret = inputStream.read(buf);
+                for (int i = 0; i < ret; i++) {
+                    padding[offset++] = buf[i];
+                }
+                len -= ret;
+            }
+        }
         ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
         IntStream.range(0, paddingBuf.capacity()/4).forEach(i ->
                 assertEquals(Integer.toHexString(paddingBuf.readInt()),
                              Integer.toHexString(0xFEDCDEAD)));
 
         // 3. reach end.
+        if (useBufferRead) {
+            byte[] b = new byte[4];
+            int ret = inputStream.read(b, 0, 4);
+            assertEquals(ret, -1);
+        }
         assertEquals(inputStream.read(), -1);
 
         assertEquals(inputStream.getBlockEntryCount(), 1);
@@ -530,4 +681,121 @@ public class BlockAwareSegmentInputStreamTest {
         }
     }
 
+    @Test
+    public void testOnlyNegativeOnEOFWithBufferedRead() throws IOException {
+        int ledgerId = 1;
+        int entrySize = 10000;
+        int lac = 0;
+
+        Random r = new Random(0);
+        ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac, () -> (byte)r.nextInt());
+
+        int blockSize = DataBlockHeaderImpl.getDataStartOffset() + entrySize * 2;
+        BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
+
+        int bytesRead = 0;
+        int ret;
+        int offset = 0;
+        int resetOffsetCount = 0;
+        byte[] buf = new byte[1024];
+        while ((ret = inputStream.read(buf, offset, buf.length - offset)) > 0) {
+            bytesRead += ret;
+            int currentOffset = offset;
+            offset = (offset + ret) % buf.length;
+            if (offset < currentOffset) {
+                resetOffsetCount++;
+            }
+        }
+        assertEquals(bytesRead, blockSize);
+        assertNotEquals(resetOffsetCount, 0);
+    }
+
+    // This test is for testing the read(byte[] buf, int off, int len) method can work properly
+    // on the offset not 0.
+    @Test
+    public void testReadTillLacWithSmallBuffer() throws Exception {
+        // simulate last data block read.
+        int ledgerId = 1;
+        int entrySize = 8;
+        int lac = 89;
+        ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac);
+
+        // set block size equals to (header + lac_entry) size.
+        int blockSize = DataBlockHeaderImpl.getDataStartOffset() + (1 + lac) * (entrySize + 4 + 8);
+        BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
+        int expectedEntryCount = (blockSize - DataBlockHeaderImpl.getDataStartOffset()) / (entrySize + 4 + 8);
+
+        // verify get methods
+        assertEquals(inputStream.getLedger(), readHandle);
+        assertEquals(inputStream.getStartEntryId(), 0);
+        assertEquals(inputStream.getBlockSize(), blockSize);
+
+        // verify read inputStream
+        // 1. read header. 128
+        byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
+        // read twice to test the offset not 0 case
+        int ret = inputStream.read(headerB, 0, 66);
+        assertEquals(ret, 66);
+        ret = inputStream.read(headerB, 66, headerB.length - 66);
+        assertEquals(headerB.length - 66, ret);
+        DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB));
+        assertEquals(headerRead.getBlockLength(), blockSize);
+        assertEquals(headerRead.getFirstEntryId(), 0);
+
+        byte[] entryData = new byte[entrySize];
+        Arrays.fill(entryData, (byte)0xB); // 0xB is MockLedgerEntry.blockPadding
+
+        // 2. read Ledger entries. 96 * 20
+        IntStream.range(0, expectedEntryCount).forEach(i -> {
+            try {
+                byte lengthBuf[] = new byte[4];
+                byte entryIdBuf[] = new byte[8];
+                byte content[] = new byte[entrySize];
+
+                int read = inputStream.read(lengthBuf, 0, 4);
+                assertEquals(read, 4);
+                read = inputStream.read(entryIdBuf, 0, 8);
+                assertEquals(read, 8);
+
+                Random random = new Random(System.currentTimeMillis());
+                int o = 0;
+                int totalRead = 0;
+                int maxReadTime = 10;
+                while (o != content.length) {
+                    int r;
+                    if (maxReadTime-- == 0) {
+                        r = entrySize - o;
+                    } else {
+                        r = random.nextInt(entrySize - o);
+                    }
+                    read = inputStream.read(content, o, r);
+                    totalRead += read;
+                    o += r;
+                }
+                assertEquals(totalRead, entrySize);
+
+                assertEquals(entrySize, Ints.fromByteArray(lengthBuf));
+                assertEquals(i, Longs.fromByteArray(entryIdBuf));
+                assertArrayEquals(entryData, content);
+            } catch (Exception e) {
+                fail("meet exception", e);
+            }
+        });
+
+        // 3. should have no padding
+        int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() -  expectedEntryCount * (entrySize + 4 + 8);
+        assertEquals(left, 0);
+        assertEquals(inputStream.getBlockSize(), inputStream.getDataBlockFullOffset());
+
+        // 4. reach end.
+        byte[] b = new byte[4];
+        ret = inputStream.read(b, 0, 4);
+        assertEquals(ret, -1);
+
+        assertEquals(inputStream.getBlockEntryCount(), expectedEntryCount);
+        assertEquals(inputStream.getBlockEntryBytesCount(), entrySize * expectedEntryCount);
+        assertEquals(inputStream.getEndEntryId(), expectedEntryCount - 1);
+
+        inputStream.close();
+    }
 }


[pulsar] 16/17: [fix][connector] KCA connectors: fix offset mapping when sanitizeTopicName=true (#15950)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1a7a157df8971618c42d2e83ddab1c5c2c81c179
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Mon Jun 6 20:49:17 2022 +0200

    [fix][connector] KCA connectors: fix offset mapping when sanitizeTopicName=true (#15950)
    
    (cherry picked from commit 49ee8a6bf4571d39adf0e942fc6bb04d9daa1290)
---
 .../java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java   | 6 +++---
 .../org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java    | 3 +++
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 232d8c092ac..502154065d9 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -243,7 +243,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
     @SuppressWarnings("rawtypes")
     protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
         final int partition = sourceRecord.getPartitionIndex().orElse(0);
-        final String topic = sourceRecord.getTopicName().orElse(topicName);
+        final String topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
         final Object key;
         final Object value;
         final Schema keySchema;
@@ -290,7 +290,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
             // keep timestampType = TimestampType.NO_TIMESTAMP_TYPE
             timestamp = sourceRecord.getMessage().get().getPublishTime();
         }
-        return new SinkRecord(sanitizeNameIfNeeded(topic, sanitizeTopicName),
+        return new SinkRecord(topic,
                 partition,
                 keySchema,
                 key,
@@ -303,7 +303,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
 
     @VisibleForTesting
     protected long currentOffset(String topic, int partition) {
-        return taskContext.currentOffset(topic, partition);
+        return taskContext.currentOffset(sanitizeNameIfNeeded(topic, sanitizeTopicName), partition);
     }
 
     // Replace all non-letter, non-digit characters with underscore.
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 8d08ebcef87..1fba098a228 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -74,6 +74,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
@@ -197,6 +198,8 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         sink.write(record);
         sink.flush();
 
+        assertTrue(sink.currentOffset("persistent://a-b/c-d/fake-topic.a", 0) > 0L);
+
         assertEquals(status.get(), 1);
         assertEquals(resultCaptor.getResult().topic(), "persistent___a_b_c_d_fake_topic_a");
 


[pulsar] 08/17: Sync topicPublishRateLimiter update (#15599)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1e026944556a4169047d1c11d538e229ea934923
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Sat May 28 10:53:34 2022 +0800

    Sync topicPublishRateLimiter update (#15599)
    
    (cherry picked from commit 51e727f25375fae1fc003370f0b7f113160f7529)
---
 .../pulsar/broker/service/AbstractTopic.java       | 41 ++++++++++++----------
 1 file changed, 22 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 04fb3ca952c..4ed89908651 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -109,6 +109,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
     protected volatile boolean schemaValidationEnforced = false;
 
     protected volatile PublishRateLimiter topicPublishRateLimiter;
+    private final Object topicPublishRateLimiterLock = new Object();
 
     protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter;
 
@@ -1079,32 +1080,34 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
      * update topic publish dispatcher for this topic.
      */
     public void updatePublishDispatcher() {
-        PublishRate publishRate = topicPolicies.getPublishRate().get();
-        if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) {
-            log.info("Enabling publish rate limiting {} ", publishRate);
-            if (!preciseTopicPublishRateLimitingEnable) {
-                this.brokerService.setupTopicPublishRateLimiterMonitor();
-            }
+        synchronized (topicPublishRateLimiterLock) {
+            PublishRate publishRate = topicPolicies.getPublishRate().get();
+            if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) {
+                log.info("Enabling publish rate limiting {} ", publishRate);
+                if (!preciseTopicPublishRateLimitingEnable) {
+                    this.brokerService.setupTopicPublishRateLimiterMonitor();
+                }
 
-            if (this.topicPublishRateLimiter == null
+                if (this.topicPublishRateLimiter == null
                     || this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
-                // create new rateLimiter if rate-limiter is disabled
-                if (preciseTopicPublishRateLimitingEnable) {
-                    this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
+                    // create new rateLimiter if rate-limiter is disabled
+                    if (preciseTopicPublishRateLimitingEnable) {
+                        this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
                             () -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor());
+                    } else {
+                        this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
+                    }
                 } else {
-                    this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
+                    this.topicPublishRateLimiter.update(publishRate);
                 }
             } else {
-                this.topicPublishRateLimiter.update(publishRate);
-            }
-        } else {
-            log.info("Disabling publish throttling for {}", this.topic);
-            if (topicPublishRateLimiter != null) {
-                topicPublishRateLimiter.close();
+                log.info("Disabling publish throttling for {}", this.topic);
+                if (topicPublishRateLimiter != null) {
+                    topicPublishRateLimiter.close();
+                }
+                this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
+                enableProducerReadForPublishRateLimiting();
             }
-            this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
-            enableProducerReadForPublishRateLimiting();
         }
     }
 


[pulsar] 07/17: [feat] [tiered-storage] Add pure S3 provider for the offloader (#15710)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d1db33ee114672903c7512559f8918f9156d7ae8
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Wed May 25 15:37:33 2022 +0800

    [feat] [tiered-storage] Add pure S3 provider for the offloader (#15710)
    
    * [improve] [tiered-storage] Add pure S3 provider for the offloader
    ---
    
    *Motivation*
    
    There have some cloud storages are compatible with S3
    APIs, such as aliyun-oss. Some other storages also use
    the S3 APIs and want to offload the data into them, but
    we only support the AWS or the Aliyun.
    The PR https://github.com/apache/pulsar/pull/8985 provides
    the Aliyun offload provider, but it has a force limitation of
    the `S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS`. That
    is not a limitation on other storage service which compatible
    with S3 APIs.
    This PR provides  a more general offload provider `S3` which uses
    pure JClouds S3 metadata and allows people to override the
    default JClouds properties through system properties.
    
    *Modifications*
    
    - Add the pure S3 offload provider
    
    (cherry picked from commit 047cb0e3117d55a79c0082c0dc3d2ab3c9bcd6f9)
---
 .../jcloud/provider/JCloudBlobStoreProvider.java   | 54 ++++++++++++++++------
 .../provider/TieredStorageConfiguration.java       | 13 ++++++
 .../provider/JCloudBlobStoreProviderTests.java     | 31 ++++++++++++-
 .../provider/TieredStorageConfigurationTests.java  | 17 +++++++
 4 files changed, 99 insertions(+), 16 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 86d5fe57d71..a19049ac2d4 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -177,17 +177,34 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
     ALIYUN_OSS("aliyun-oss", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
         @Override
         public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
-            ALIYUN_OSS_VALIDATION.validate(config);
+            S3_VALIDATION.validate(config);
         }
 
         @Override
         public BlobStore getBlobStore(TieredStorageConfiguration config) {
-            return ALIYUN_OSS_BLOB_STORE_BUILDER.getBlobStore(config);
+            return S3_BLOB_STORE_BUILDER.getBlobStore(config);
         }
 
         @Override
         public void buildCredentials(TieredStorageConfiguration config) {
-            ALIYUN_OSS_CREDENTIAL_BUILDER.buildCredentials(config);
+            S3_CREDENTIAL_BUILDER.buildCredentials(config);
+        }
+    },
+
+    S3("S3", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
+        @Override
+        public BlobStore getBlobStore(TieredStorageConfiguration config) {
+            return S3_BLOB_STORE_BUILDER.getBlobStore(config);
+        }
+
+        @Override
+        public void buildCredentials(TieredStorageConfiguration config) {
+            S3_CREDENTIAL_BUILDER.buildCredentials(config);
+        }
+
+        @Override
+        public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
+            S3_VALIDATION.validate(config);
         }
     },
 
@@ -370,12 +387,14 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         }
     };
 
-    static final BlobStoreBuilder ALIYUN_OSS_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
+    static final BlobStoreBuilder S3_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
         ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata());
         contextBuilder.modules(Arrays.asList(new SLF4JLoggingModule()));
         Properties overrides = config.getOverrides();
-        // For security reasons, OSS supports only virtual hosted style access.
-        overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
+        if (ALIYUN_OSS.getDriver().equals(config.getDriver())) {
+            // For security reasons, OSS supports only virtual hosted style access.
+            overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
+        }
         contextBuilder.overrides(overrides);
         contextBuilder.endpoint(config.getServiceEndpoint());
 
@@ -392,7 +411,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         }
     };
 
-    static final ConfigValidation ALIYUN_OSS_VALIDATION = (TieredStorageConfiguration config) -> {
+    static final ConfigValidation S3_VALIDATION = (TieredStorageConfiguration config) -> {
         if (Strings.isNullOrEmpty(config.getServiceEndpoint())) {
             throw new IllegalArgumentException(
                     "ServiceEndpoint must specified for " + config.getDriver() + " offload");
@@ -410,14 +429,21 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         }
     };
 
-    static final CredentialBuilder ALIYUN_OSS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
-        String accountName = System.getenv("ALIYUN_OSS_ACCESS_KEY_ID");
-        if (StringUtils.isEmpty(accountName)) {
-            throw new IllegalArgumentException("Couldn't get the aliyun oss access key id.");
+    static final CredentialBuilder S3_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
+        String accountName = System.getenv().getOrDefault("ACCESS_KEY_ID", "");
+        // For forward compatibility
+        if (StringUtils.isEmpty(accountName.trim())) {
+            accountName = System.getenv().getOrDefault("ALIYUN_OSS_ACCESS_KEY_ID", "");
+        }
+        if (StringUtils.isEmpty(accountName.trim())) {
+            throw new IllegalArgumentException("Couldn't get the access key id.");
+        }
+        String accountKey = System.getenv().getOrDefault("ACCESS_KEY_ID", "");
+        if (StringUtils.isEmpty(accountKey.trim())) {
+            accountKey = System.getenv().getOrDefault("ALIYUN_OSS_ACCESS_KEY_SECRET", "");
         }
-        String accountKey = System.getenv("ALIYUN_OSS_ACCESS_KEY_SECRET");
-        if (StringUtils.isEmpty(accountKey)) {
-            throw new IllegalArgumentException("Couldn't get the aliyun oss access key secret.");
+        if (StringUtils.isEmpty(accountKey.trim())) {
+            throw new IllegalArgumentException("Couldn't get the access key secret.");
         }
         Credentials credentials = new Credentials(
                 accountName, accountKey);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
index c1054969a42..18e3bbf0db8 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
@@ -329,6 +329,19 @@ public class TieredStorageConfiguration {
             overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
         }
 
+        // load more jclouds properties into the overrides
+        System.getProperties().entrySet().stream()
+            .filter(p -> p.getKey().toString().startsWith("jclouds"))
+            .forEach(jcloudsProp -> {
+                overrides.setProperty(jcloudsProp.getKey().toString(), jcloudsProp.getValue().toString());
+            });
+
+        System.getenv().entrySet().stream()
+            .filter(p -> p.getKey().toString().startsWith("jclouds"))
+            .forEach(jcloudsProp -> {
+                overrides.setProperty(jcloudsProp.getKey().toString(), jcloudsProp.getValue().toString());
+            });
+
         log.info("getOverrides: {}", overrides.toString());
         return overrides;
     }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
index 28e5829ba2a..4f0c60bc007 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
@@ -23,8 +23,6 @@ import static org.testng.Assert.assertEquals;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
-import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
 import org.testng.annotations.Test;
 
 public class JCloudBlobStoreProviderTests {
@@ -105,4 +103,33 @@ public class JCloudBlobStoreProviderTests {
         config = new TieredStorageConfiguration(map);
         JCloudBlobStoreProvider.TRANSIENT.validate(config);
     }
+
+    @Test()
+    public void s3ValidationTest() {
+        Map<String, String> map = new HashMap<>();
+        map.put("managedLedgerOffloadDriver", "S3");
+        map.put("managedLedgerOffloadServiceEndpoint", "http://s3.service");
+        map.put("managedLedgerOffloadBucket", "test-s3-bucket");
+        TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
+        configuration.getProvider().validate(configuration);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "ServiceEndpoint must specified for S3 offload")
+    public void s3ValidationServiceEndpointMissed() {
+        Map<String, String> map = new HashMap<>();
+        map.put("managedLedgerOffloadDriver", "S3");
+        TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
+        configuration.getProvider().validate(configuration);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "Bucket cannot be empty for S3 offload")
+    public void s3ValidationBucketMissed() {
+        Map<String, String> map = new HashMap<>();
+        map.put("managedLedgerOffloadDriver", "S3");
+        map.put("managedLedgerOffloadServiceEndpoint", "http://s3.service");
+        TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
+        configuration.getProvider().validate(configuration);
+    }
 }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
index f80f3ceaa1a..bf5e046bf70 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
@@ -22,6 +22,8 @@ import static org.testng.Assert.assertEquals;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+
 import org.jclouds.domain.Credentials;
 import org.testng.annotations.Test;
 
@@ -205,4 +207,19 @@ public class TieredStorageConfigurationTests {
         assertEquals(config.getMaxBlockSizeInBytes(), new Integer(12));
         assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
     }
+
+    @Test
+    public void overridePropertiesTest() {
+        Map<String, String> map = new HashMap<>();
+        map.put("s3ManagedLedgerOffloadServiceEndpoint", "http://localhost");
+        map.put("s3ManagedLedgerOffloadRegion", "my-region");
+        System.setProperty("jclouds.SystemPropertyA", "A");
+        System.setProperty("jclouds.region", "jclouds-region");
+        TieredStorageConfiguration config = new TieredStorageConfiguration(map);
+        Properties properties = config.getOverrides();
+        System.out.println(properties.toString());
+        assertEquals(properties.get("jclouds.region"), "jclouds-region");
+        assertEquals(config.getServiceEndpoint(), "http://localhost");
+        assertEquals(properties.get("jclouds.SystemPropertyA"), "A");
+    }
 }


[pulsar] 09/17: fix bug in getNumberOfEntriesInStorage (#15627)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fa862e30d698246228d7de92f19d0dc09cd3611a
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Sat May 28 10:58:35 2022 +0800

    fix bug in getNumberOfEntriesInStorage (#15627)
    
    (cherry picked from commit a43981109a9322d94082ae0d87d0de53b8f237e8)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 29 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 424874a8dfa..23cc1734132 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1026,7 +1026,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     public long getNumberOfEntriesInStorage() {
-        return ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition().getNext()));
+        return ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
     }
 
     @Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index c05565842f6..8f6fdfe6640 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2274,6 +2274,35 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         assertEquals(targetPosition.getEntryId(), 4);
     }
 
+    @Test
+    public void testGetNumberOfEntriesInStorage() throws Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(5);
+        ManagedLedgerImpl managedLedger =
+                (ManagedLedgerImpl) factory.open("testGetNumberOfEntriesInStorage", managedLedgerConfig);
+        // open cursor to prevent ledger to be deleted when ledger rollover
+        ManagedCursorImpl managedCursor = (ManagedCursorImpl) managedLedger.openCursor("cursor");
+        int numberOfEntries = 10;
+        for (int i = 0; i < numberOfEntries; i++) {
+            managedLedger.addEntry(("entry-" + i).getBytes(Encoding));
+        }
+
+        //trigger ledger rollover and wait for the new ledger created
+        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
+        managedLedger.rollCurrentLedgerIfFull();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(managedLedger.getLedgersInfo().size(), 2);
+            assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+        });
+        assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
+        assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
+        log.info("### ledgers {}", managedLedger.getLedgersInfo());
+        long length = managedCursor.getNumberOfEntriesInStorage();
+        assertEquals(length, numberOfEntries);
+    }
+
     @Test
     public void testEstimatedBacklogSize() throws Exception {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testEstimatedBacklogSize");


[pulsar] 03/17: Use dispatchRateLimiterLock to update dispatchRateLimiter. (#15601)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a5034d6ccd7ed69230953386f2eced134f1a0f9e
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon May 16 10:07:47 2022 +0800

    Use dispatchRateLimiterLock to update dispatchRateLimiter. (#15601)
    
    https://github.com/apache/pulsar/blob/58c82a71beb7506e422def391af532945be2b7a7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L377-L399
    
    The object lock may change when execute at line-382, and cause the lock to become useless.
    
    So use `dispatchRateLimiterLock` to synchronize.
    
    (cherry picked from commit ff4e6000f2d58eff930178ae0c02ef9c5fffb47c)
---
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 568ac5ae063..b1ccf0b23ec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -184,6 +184,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     public boolean msgChunkPublished;
 
     private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
+    private final Object dispatchRateLimiterLock = new Object();
     private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
     private final long backloggedCursorThresholdEntries;
     public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
@@ -371,7 +372,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     private void initializeRateLimiterIfNeeded(Optional<Policies> policies) {
-        synchronized (dispatchRateLimiter) {
+        synchronized (dispatchRateLimiterLock) {
             // dispatch rate limiter for topic
             if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
                     .isDispatchRateNeeded(brokerService, policies, topic, Type.TOPIC)) {
@@ -3096,7 +3097,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies policies) {
-        synchronized (dispatchRateLimiter) {
+        synchronized (dispatchRateLimiterLock) {
             if (!dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) {
                 this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
             }


[pulsar] 13/17: [Revert] [#15483] Remove sensitive msg from consumer/producer stats log (#15817)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f8bc91f13fcf1ed509d8323dd53be31cdae7d891
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Sun Jun 5 09:44:47 2022 +0800

    [Revert] [#15483] Remove sensitive msg from consumer/producer stats log (#15817)
    
    ### Motivation
    See #15483
    The `@Secret` annotation works well, and introduced in #8910
    
    ### Modifications
    - Revert the unneeded `@JsonIgnore`
    - remove `Assert.assertFalse(s.contains("Password"));` `Password` is printed in a key. The sensitive field's value is `****`.
    
    (cherry picked from commit 67361e8db632b0cd4c23198c5c569f3f2193fc70)
---
 .../apache/pulsar/client/impl/conf/ClientConfigurationData.java    | 7 -------
 .../pulsar/client/impl/conf/ClientConfigurationDataTest.java       | 1 -
 2 files changed, 8 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index c599a65c5d6..6d99a9fa986 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -32,7 +32,6 @@ import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ProxyProtocol;
@@ -59,7 +58,6 @@ public class ClientConfigurationData implements Serializable, Cloneable {
             value = "The implementation class of ServiceUrlProvider used to generate ServiceUrl."
     )
     @JsonIgnore
-    @Getter(onMethod_ = @JsonIgnore)
     private transient ServiceUrlProvider serviceUrlProvider;
 
     @ApiModelProperty(
@@ -257,8 +255,6 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     )
 
     @Secret
-    @JsonIgnore
-    @Getter(onMethod_ = @JsonIgnore)
     private String tlsTrustStorePassword = null;
 
     @ApiModelProperty(
@@ -332,10 +328,8 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     )
 
     @Secret
-    @JsonIgnore
     private String socks5ProxyPassword;
 
-    @JsonIgnore
     public Authentication getAuthentication() {
         if (authentication == null) {
             this.authentication = AuthenticationDisabled.INSTANCE;
@@ -393,7 +387,6 @@ public class ClientConfigurationData implements Serializable, Cloneable {
         return Objects.nonNull(socks5ProxyUsername) ? socks5ProxyUsername : System.getProperty("socks5Proxy.username");
     }
 
-    @JsonIgnore
     public String getSocks5ProxyPassword() {
         return Objects.nonNull(socks5ProxyPassword) ? socks5ProxyPassword : System.getProperty("socks5Proxy.password");
     }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java
index b5c30c9a7c6..c817ec996d4 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java
@@ -48,7 +48,6 @@ public class ClientConfigurationDataTest {
         clientConfigurationData.setSocks5ProxyPassword("yyyy");
         clientConfigurationData.setAuthentication(new AuthenticationToken("zzzz"));
         String s = w.writeValueAsString(clientConfigurationData);
-        Assert.assertFalse(s.contains("Password"));
         Assert.assertFalse(s.contains("xxxx"));
         Assert.assertFalse(s.contains("yyyy"));
         Assert.assertFalse(s.contains("zzzz"));


[pulsar] 10/17: Fix NPE in MessageDeduplication. (#15820)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b4c704e1f6c2f8a93b702700d4a53747da4c5fae
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Jun 1 11:09:01 2022 +0800

    Fix NPE in MessageDeduplication. (#15820)
    
    (cherry picked from commit 01d7bfa681b23d1a236b1411b83e854c9ad9323f)
---
 .../pulsar/broker/service/persistent/MessageDeduplication.java     | 2 +-
 .../pulsar/broker/service/persistent/MessageDuplicationTest.java   | 7 +++++++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 761a8a65d2a..486f3d8540d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -471,7 +471,7 @@ public class MessageDeduplication {
                 hasInactive = true;
             }
         }
-        if (hasInactive) {
+        if (hasInactive && isEnabled()) {
             takeSnapshot(getManagedCursor().getMarkDeletedPosition());
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 765d7463f98..8b379928a83 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -199,6 +199,13 @@ public class MessageDuplicationTest {
         messageDeduplication.purgeInactiveProducers();
         assertEquals(inactiveProducers.size(), 3);
 
+        doReturn(false).when(messageDeduplication).isEnabled();
+        inactiveProducers.put(producerName2, System.currentTimeMillis() - 80000);
+        inactiveProducers.put(producerName3, System.currentTimeMillis() - 80000);
+        messageDeduplication.purgeInactiveProducers();
+        assertFalse(inactiveProducers.containsKey(producerName2));
+        assertFalse(inactiveProducers.containsKey(producerName3));
+        doReturn(true).when(messageDeduplication).isEnabled();
         // Modify the inactive time of produce2 and produce3
         // messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3
         inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000);


[pulsar] 17/17: Fix cherry-pick issue

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d50201d418354743e67cc33c1a9e6c44e1f250cc
Author: penghui <pe...@apache.org>
AuthorDate: Tue Jun 7 13:03:22 2022 +0800

    Fix cherry-pick issue
---
 .../org/apache/pulsar/broker/service/MessageCumulativeAckTest.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index 86754efc0c2..2b67a58e43d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -155,7 +155,7 @@ public class MessageCumulativeAckTest {
     public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
         Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0,
             "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
-            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+                null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
 
         CommandAck commandAck = new CommandAck();
         commandAck.setAckType(Cumulative);
@@ -170,7 +170,7 @@ public class MessageCumulativeAckTest {
     public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
         Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0,
             "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
-            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+            null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
 
         CommandAck commandAck = new CommandAck();
         commandAck.setAckType(Cumulative);
@@ -185,7 +185,7 @@ public class MessageCumulativeAckTest {
     public void testAckWithMoreThanNoneMessageIds() throws Exception {
         Consumer consumer = new Consumer(sub, Failover, "topic-1", consumerId, 0,
             "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
-            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+            null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
 
         CommandAck commandAck = new CommandAck();
         commandAck.setAckType(Cumulative);


[pulsar] 11/17: Fix avro conversion order of registration (#15863)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 784c9c83be28d885accc3e6fe00b2e2c18d02a63
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Wed Jun 1 16:58:48 2022 +0800

    Fix avro conversion order of registration (#15863)
    
    ### Motivation
    
    Fixes #15858
    
    The conversion that is registered first is a higher priority than the registered later, so `TimestampMillisConversion` should not be registered after `TimestampMicrosConversion`.
    
    ### Modifications
    
    Improve `avro` conversion order of registration.
    
    (cherry picked from commit 311fdb5dad09217c1706409feb3387d59285c51f)
---
 .../pulsar/client/impl/schema/AvroSchema.java       |  3 ++-
 .../pulsar/client/impl/schema/AvroSchemaTest.java   | 21 +++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index def27ecea53..3d0bf157cb3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -115,8 +115,8 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> {
         reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
         reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
         reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
-        reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
         if (jsr310ConversionEnabled) {
+            // The conversion that is registered first is higher priority than the registered later.
             reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
         } else {
             try {
@@ -125,6 +125,7 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> {
             } catch (ClassNotFoundException e) {
                 // Skip if have not provide joda-time dependency.
             }
+            reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
         }
         reflectData.addLogicalTypeConversion(new Conversions.UUIDConversion());
     }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index 9e707af8367..d69f8bf66ba 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -42,6 +42,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.SchemaValidationException;
 import org.apache.avro.SchemaValidator;
 import org.apache.avro.SchemaValidatorBuilder;
+import org.apache.avro.data.TimeConversions;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.BufferedBinaryEncoder;
 import org.apache.avro.reflect.AvroDefault;
@@ -459,4 +460,24 @@ public class AvroSchemaTest {
         assertEquals(pojo2.value1, myBigDecimalPojo.value1);
         assertEquals(pojo2.value2, myBigDecimalPojo.value2);
     }
+
+
+    @Data
+    private static class TimestampStruct {
+        Instant value;
+    }
+
+    @Test
+    public void testTimestampWithJsr310Conversion() {
+        AvroSchema<TimestampStruct> schema = AvroSchema.of(TimestampStruct.class);
+        Assert.assertEquals(
+                schema.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(),
+                new TimeConversions.TimestampMicrosConversion().getLogicalTypeName());
+
+        AvroSchema<TimestampStruct> schema2 = AvroSchema.of(SchemaDefinition.<TimestampStruct>builder()
+                .withPojo(TimestampStruct.class).withJSR310ConversionEnabled(true).build());
+        Assert.assertEquals(
+                schema2.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(),
+                new TimeConversions.TimestampMillisConversion().getLogicalTypeName());
+    }
 }


[pulsar] 14/17: [fix][broker]Fast return if ack cumulative illegal (#15695)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5bf69dc1cd7e5ac5e9f211a02feba7e62a832408
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Mon Jun 6 10:59:55 2022 +0800

    [fix][broker]Fast return if ack cumulative illegal (#15695)
    
    (cherry picked from commit 02dca31a78523a7d061ac1d6702ff6600a7f4ec4)
---
 .../org/apache/pulsar/broker/service/Consumer.java |   2 +
 .../broker/service/MessageCumulativeAckTest.java   | 199 +++++++++++++++++++++
 2 files changed, 201 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 7eabeaafd1f..e7651706127 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -376,11 +376,13 @@ public class Consumer {
         if (ack.getAckType() == AckType.Cumulative) {
             if (ack.getMessageIdsCount() != 1) {
                 log.warn("[{}] [{}] Received multi-message ack", subscription, consumerId);
+                return CompletableFuture.completedFuture(null);
             }
 
             if (Subscription.isIndividualAckMode(subType)) {
                 log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring",
                         subscription, consumerId);
+                return CompletableFuture.completedFuture(null);
             }
             PositionImpl position = PositionImpl.EARLIEST;
             if (ack.getMessageIdsCount() == 1) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
new file mode 100644
index 00000000000..86754efc0c2
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.apache.pulsar.common.api.proto.CommandAck.AckType.Cumulative;
+import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Exclusive;
+import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Failover;
+import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Key_Shared;
+import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Shared;
+import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.proto.CommandAck;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.api.proto.ProtocolVersion;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class MessageCumulativeAckTest {
+    private final int consumerId = 1;
+    private BrokerService brokerService;
+    private ServerCnx serverCnx;
+    private MetadataStore store;
+    protected PulsarService pulsar;
+    private OrderedExecutor executor;
+    private EventLoopGroup eventLoopGroup;
+    private PersistentSubscription sub;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-cumulative-ack-test").build();
+        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
+        svcConfig.setBrokerShutdownTimeoutMs(0L);
+        svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+        svcConfig.setClusterName("pulsar-cluster");
+        pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig);
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+
+        ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class);
+        doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+        doReturn(TransactionTestBase.createMockBookKeeper(executor))
+            .when(pulsar).getBookKeeperClient();
+
+        store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build());
+        doReturn(store).when(pulsar).getLocalMetadataStore();
+        doReturn(store).when(pulsar).getConfigurationMetadataStore();
+
+        PulsarResources pulsarResources = new PulsarResources(store, store);
+        doReturn(pulsarResources).when(pulsar).getPulsarResources();
+
+        serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
+        doReturn(true).when(serverCnx).isActive();
+        doReturn(true).when(serverCnx).isWritable();
+        doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
+        when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getValue());
+        when(serverCnx.ctx()).thenReturn(mock(ChannelHandlerContext.class));
+        doReturn(new PulsarCommandSenderImpl(null, serverCnx))
+            .when(serverCnx).getCommandSender();
+
+        eventLoopGroup = new NioEventLoopGroup();
+        brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup);
+        doReturn(brokerService).when(pulsar).getBrokerService();
+
+        String topicName = TopicName.get("MessageCumulativeAckTest").toString();
+        PersistentTopic persistentTopic = new PersistentTopic(topicName, mock(ManagedLedger.class), brokerService);
+        sub = spy(new PersistentSubscription(persistentTopic, "sub-1",
+            mock(ManagedCursorImpl.class), false));
+        doNothing().when(sub).acknowledgeMessage(any(), any(), any());
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void shutdown() throws Exception {
+        if (brokerService != null) {
+            brokerService.close();
+            brokerService = null;
+        }
+        if (pulsar != null) {
+            pulsar.close();
+            pulsar = null;
+        }
+
+        executor.shutdown();
+        if (eventLoopGroup != null) {
+            eventLoopGroup.shutdownGracefully().get();
+        }
+        store.close();
+        sub = null;
+    }
+
+    @DataProvider(name = "individualAckModes")
+    public static Object[][] individualAckModes() {
+        return new Object[][]{
+            {Shared},
+            {Key_Shared},
+        };
+    }
+
+    @DataProvider(name = "notIndividualAckModes")
+    public static Object[][] notIndividualAckModes() {
+        return new Object[][]{
+            {Exclusive},
+            {Failover},
+        };
+    }
+
+    @Test(timeOut = 5000, dataProvider = "individualAckModes")
+    public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
+        Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0,
+            "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
+            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+
+        CommandAck commandAck = new CommandAck();
+        commandAck.setAckType(Cumulative);
+        commandAck.setConsumerId(consumerId);
+        commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
+
+        consumer.messageAcked(commandAck).get();
+        verify(sub, never()).acknowledgeMessage(any(), any(), any());
+    }
+
+    @Test(timeOut = 5000, dataProvider = "notIndividualAckModes")
+    public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
+        Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0,
+            "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
+            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+
+        CommandAck commandAck = new CommandAck();
+        commandAck.setAckType(Cumulative);
+        commandAck.setConsumerId(consumerId);
+        commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
+
+        consumer.messageAcked(commandAck).get();
+        verify(sub, times(1)).acknowledgeMessage(any(), any(), any());
+    }
+
+    @Test(timeOut = 5000)
+    public void testAckWithMoreThanNoneMessageIds() throws Exception {
+        Consumer consumer = new Consumer(sub, Failover, "topic-1", consumerId, 0,
+            "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
+            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+
+        CommandAck commandAck = new CommandAck();
+        commandAck.setAckType(Cumulative);
+        commandAck.setConsumerId(consumerId);
+        commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
+        commandAck.addMessageId().setEntryId(0L).setLedgerId(2L);
+
+        consumer.messageAcked(commandAck).get();
+        verify(sub, never()).acknowledgeMessage(any(), any(), any());
+    }
+}


[pulsar] 06/17: [fix][ML]Fix NPE when put value to `RangeCache`. (#15707)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2e5c984b201f99759c71fff2e2757860c69d4aae
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Tue May 24 12:15:25 2022 +0800

    [fix][ML]Fix NPE when put value to `RangeCache`. (#15707)
    
    ### Motivation
    
    When `ReferenceCounted` object overrides the method `deallocate` to make the `getLength` value equal null will cause NPE because the `RangeCache#put` method is not thread-safe. (The process of describing this abstraction is not very clear, please refer to the code modification :)
    
    Pulsar implementation may throw an exception to make `OpAddEntry` fail abnormal and fence the topic. relative code as below:
    
    https://github.com/apache/pulsar/blob/defeec0e84a63ea865f3a2790bc61b66a02254c5/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L211-L217
    
    **Exception screenshot:**
    
    ```java
    java.lang.NullPointerException: Cannot invoke "String.length()" because "value.s" is null
    
            at org.apache.bookkeeper.mledger.util.RangeCacheTest.lambda$testInParallel$6(RangeCacheTest.java:279)
            at org.apache.bookkeeper.mledger.util.RangeCache.put(RangeCache.java:77)
            at org.apache.bookkeeper.mledger.util.RangeCacheTest.testInParallel(RangeCacheTest.java:283)
            at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
            at java.base/java.lang.reflect.Method.invoke(Method.java:577)
            at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
            at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:599)
            at org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174)
            at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46)
            at org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
            at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
            at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
            at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
            at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
            at org.testng.TestRunner.privateRun(TestRunner.java:764)
            at org.testng.TestRunner.run(TestRunner.java:585)
            at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
            at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
            at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
            at org.testng.SuiteRunner.run(SuiteRunner.java:286)
            at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
            at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
            at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
            at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
            at org.testng.TestNG.runSuites(TestNG.java:1069)
            at org.testng.TestNG.run(TestNG.java:1037)
            at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
            at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
    ```
    
    ### Modifications
    
    - Make the `RangeCache#put` method to thread-safe.
    
    (cherry picked from commit b155d84c2ee397fe8003f452f04ae6cedf229b5c)
---
 .../apache/bookkeeper/mledger/util/RangeCache.java | 12 +++++----
 .../bookkeeper/mledger/util/RangeCacheTest.java    | 29 ++++++++++++++++++++--
 2 files changed, 34 insertions(+), 7 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index a5b31335f35..1de9429d7c0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.tuple.Pair;
 
 /**
@@ -73,12 +74,13 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
      * @return whether the entry was inserted in the cache
      */
     public boolean put(Key key, Value value) {
-        if (entries.putIfAbsent(key, value) == null) {
+        MutableBoolean flag = new MutableBoolean();
+        entries.computeIfAbsent(key, (k) -> {
             size.addAndGet(weighter.getSize(value));
-            return true;
-        } else {
-            return false;
-        }
+            flag.setValue(true);
+            return value;
+        });
+        return flag.booleanValue();
     }
 
     public Value get(Key key) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
index 95896d24f35..f31aa4a74f9 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
@@ -29,11 +29,15 @@ import io.netty.util.AbstractReferenceCounted;
 import io.netty.util.ReferenceCounted;
 import org.apache.commons.lang3.tuple.Pair;
 import org.testng.annotations.Test;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 public class RangeCacheTest {
 
     class RefString extends AbstractReferenceCounted implements ReferenceCounted {
-        final String s;
+        String s;
 
         RefString(String s) {
             super();
@@ -43,7 +47,7 @@ public class RangeCacheTest {
 
         @Override
         protected void deallocate() {
-            // no-op
+            s = null;
         }
 
         @Override
@@ -122,6 +126,7 @@ public class RangeCacheTest {
         assertEquals(cache.getNumberOfEntries(), 2);
     }
 
+
     @Test
     public void customTimeExtraction() {
         RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> x.s.length());
@@ -268,4 +273,24 @@ public class RangeCacheTest {
         assertEquals((long) res.getRight(), 10);
         assertEquals(cache.getSize(), 90);
     }
+
+    @Test
+    public void testInParallel() {
+        RangeCache<String, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> 0);
+        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+        executor.scheduleWithFixedDelay(cache::clear, 10, 10, TimeUnit.MILLISECONDS);
+        for (int i = 0; i < 1000; i++) {
+            cache.put(UUID.randomUUID().toString(), new RefString("zero"));
+        }
+        executor.shutdown();
+    }
+
+    @Test
+    public void testPutSameObj() {
+        RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> 0);
+        RefString s0 = new RefString("zero");
+        assertEquals(s0.refCnt(), 1);
+        assertTrue(cache.put(0, s0));
+        assertFalse(cache.put(0, s0));
+    }
 }


[pulsar] 01/17: [Transaction] Fix cursor readPosition is bigger than maxPosition in OpReadEntry (#14667)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eed7825d8ee76cb77ab492ac30b981df1ce918ad
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Mon Mar 14 16:51:54 2022 +0800

    [Transaction] Fix cursor readPosition is bigger than maxPosition in OpReadEntry (#14667)
    
    ### Motivation
    Fix cursor read op dead loop.
    ### Modifications
    1. in OpReadEntry we can't use cursor readPosition, because it is not the OpReadEntry readPosition, it is cursor readPosition when one ledger is empty the OpReadEntry readPosition is not equals cursor readPosition, we should use OpReadEntry readPosition to judge the OpReadEntry can be finished.
    2. when readPosition is bigger than maxPosition in OpReadEntry, we should complete this OpReadEntry
    ### Verifying this change
    add test
    
    (cherry picked from commit a242f03f69791fcfa1934b6cab20689393228381)
---
 .../bookkeeper/mledger/impl/OpReadEntry.java       |  3 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 43 +++++++++++++++++++++-
 2 files changed, 43 insertions(+), 3 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 210ad31063a..178ee2dedab 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -134,8 +134,9 @@ class OpReadEntry implements ReadEntriesCallback {
     }
 
     void checkReadCompletion() {
+        // op readPosition is smaller or equals maxPosition then can read again
         if (entries.size() < count && cursor.hasMoreEntries()
-                && ((PositionImpl) cursor.getReadPosition()).compareTo(maxPosition) < 0) {
+                && maxPosition.compareTo(readPosition) > 0) {
             // We still have more entries to read from the next ledger, schedule a new async operation
             if (nextReadPosition.getLedgerId() != readPosition.getLedgerId()) {
                 cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index a2127cb7fd1..d64cac6bbd3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -30,7 +30,6 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
@@ -59,7 +58,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import lombok.Cleanup;
@@ -3713,5 +3711,46 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         assertNotEquals(cursor.getCursorLedger(), initialLedgerId);
     }
 
+    @Test
+    public void testReadEmptyEntryList() throws Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(1);
+        managedLedgerConfig.setMetadataMaxEntriesPerLedger(1);
+        managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory
+                .open("testReadEmptyEntryList", managedLedgerConfig);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test");
+
+        PositionImpl lastPosition = (PositionImpl) ledger.addEntry("test".getBytes(Encoding));
+        ledger.rollCurrentLedgerIfFull();
+
+        AtomicBoolean flag = new AtomicBoolean();
+        flag.set(false);
+        ReadEntriesCallback callback = new ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                if (entries.size() == 0) {
+                    flag.set(true);
+                }
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+
+            }
+        };
+
+        // op readPosition is bigger than maxReadPosition
+        OpReadEntry opReadEntry = OpReadEntry.create(cursor, ledger.lastConfirmedEntry, 10, callback,
+                null, PositionImpl.get(lastPosition.getLedgerId(), -1));
+        Field field = ManagedCursorImpl.class.getDeclaredField("readPosition");
+        field.setAccessible(true);
+        field.set(cursor, PositionImpl.EARLIEST);
+        ledger.asyncReadEntries(opReadEntry);
+
+        // when readPosition is bigger than maxReadPosition, should complete the opReadEntry
+        Awaitility.await().untilAsserted(() -> assertTrue(flag.get()));
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
 }


[pulsar] 04/17: [optimize][txn] Optimize transaction lowWaterMark to clean useless data faster (#15592)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fa78cf8f45ba40f5e699f3510606db444846b323
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Sat May 21 17:15:56 2022 +0800

    [optimize][txn] Optimize transaction lowWaterMark to clean useless data faster (#15592)
    
    (cherry picked from commit fcf5e148b055d617db37eef3c40d4004e74190a5)
---
 .../buffer/impl/TopicTransactionBuffer.java        |  72 ++++++-----
 .../pendingack/impl/PendingAckHandleImpl.java      |  49 ++++++--
 .../buffer/TransactionLowWaterMarkTest.java        | 140 ++++++++++++++++++++-
 .../pendingack/PendingAckPersistentTest.java       |  20 ++-
 4 files changed, 223 insertions(+), 58 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index c9cde544c80..6469ba9fb9f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.SneakyThrows;
@@ -96,8 +97,13 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
 
     private final CompletableFuture<Void> transactionBufferFuture = new CompletableFuture<>();
 
+    /**
+     * The map is used to store the lowWaterMarks which key is TC ID and value is lowWaterMark of the TC.
+     */
     private final ConcurrentHashMap<Long, Long> lowWaterMarks = new ConcurrentHashMap<>();
 
+    private final Semaphore handleLowWaterMark = new Semaphore(1);
+
     public TopicTransactionBuffer(PersistentTopic topic) {
         super(State.None);
         this.topic = topic;
@@ -285,13 +291,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
 
     @Override
     public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
-        lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
-            if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
-                return lowWaterMark;
-            } else {
-                return oldLowWaterMark;
-            }
-        });
         if (log.isDebugEnabled()) {
             log.debug("Transaction {} commit on topic {}.", txnID.toString(), topic.getName());
         }
@@ -332,13 +331,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
 
     @Override
     public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
-        lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
-            if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
-                return lowWaterMark;
-            } else {
-                return oldLowWaterMark;
-            }
-        });
         if (log.isDebugEnabled()) {
             log.debug("Transaction {} abort on topic {}.", txnID.toString(), topic.getName());
         }
@@ -358,12 +350,12 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                         synchronized (TopicTransactionBuffer.this) {
                             aborts.put(txnID, (PositionImpl) position);
                             updateMaxReadPosition(txnID);
-                            handleLowWaterMark(txnID, lowWaterMark);
                             changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
                             clearAbortedTransactions();
                             takeSnapshotByChangeTimes();
                         }
                         completableFuture.complete(null);
+                        handleLowWaterMark(txnID, lowWaterMark);
                     }
 
                     @Override
@@ -384,30 +376,36 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
     }
 
     private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
-        if (!ongoingTxns.isEmpty()) {
-            TxnID firstTxn = ongoingTxns.firstKey();
-            if (firstTxn.getMostSigBits() == txnID.getMostSigBits() && lowWaterMark >= firstTxn.getLeastSigBits()) {
-                ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L,
-                        firstTxn.getMostSigBits(), firstTxn.getLeastSigBits());
-                try {
-                    topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() {
-                        @Override
-                        public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-                            synchronized (TopicTransactionBuffer.this) {
-                                aborts.put(firstTxn, (PositionImpl) position);
-                                updateMaxReadPosition(firstTxn);
-                            }
-                        }
-
-                        @Override
-                        public void addFailed(ManagedLedgerException exception, Object ctx) {
-                            log.error("Failed to abort low water mark for txn {}", txnID, exception);
-                        }
-                    }, null);
-                } finally {
-                    abortMarker.release();
+        lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
+            if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
+                return lowWaterMark;
+            } else {
+                return oldLowWaterMark;
+            }
+        });
+        if (handleLowWaterMark.tryAcquire()) {
+            if (!ongoingTxns.isEmpty()) {
+                TxnID firstTxn = ongoingTxns.firstKey();
+                long tCId = firstTxn.getMostSigBits();
+                Long lowWaterMarkOfFirstTxnId = lowWaterMarks.get(tCId);
+                if (lowWaterMarkOfFirstTxnId != null && firstTxn.getLeastSigBits() <= lowWaterMarkOfFirstTxnId) {
+                    abortTxn(firstTxn, lowWaterMarkOfFirstTxnId)
+                            .thenRun(() -> {
+                                log.warn("Successes to abort low water mark for txn [{}], topic [{}],"
+                                        + " lowWaterMark [{}]", firstTxn, topic.getName(), lowWaterMarkOfFirstTxnId);
+                                handleLowWaterMark.release();
+                            })
+                            .exceptionally(ex -> {
+                                log.warn("Failed to abort low water mark for txn {}, topic [{}], "
+                                        + "lowWaterMark [{}], ", firstTxn, topic.getName(), lowWaterMarkOfFirstTxnId,
+                                        ex);
+                                handleLowWaterMark.release();
+                                return null;
+                            });
+                    return;
                 }
             }
+            handleLowWaterMark.release();
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 7e11592e063..323a1414e43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -29,10 +29,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -114,6 +116,13 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
 
     private final BlockingQueue<Runnable> acceptQueue = new LinkedBlockingDeque<>();
 
+    /**
+     * The map is used to store the lowWaterMarks which key is TC ID and value is lowWaterMark of the TC.
+     */
+    private final ConcurrentHashMap<Long, Long> lowWaterMarks = new ConcurrentHashMap<>();
+
+    private final Semaphore handleLowWaterMark = new Semaphore(1);
+
     @Getter
     private final ExecutorService internalPinnedExecutor;
 
@@ -595,20 +604,34 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
     }
 
     private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
-        if (individualAckOfTransaction != null && !individualAckOfTransaction.isEmpty()) {
-            TxnID firstTxn = individualAckOfTransaction.firstKey();
-
-            if (firstTxn.getMostSigBits() == txnID.getMostSigBits()
-                    && firstTxn.getLeastSigBits() <= lowWaterMark) {
-                abortTxn(firstTxn, null, lowWaterMark).thenRun(() -> {
-                    log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], "
-                            + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
-                }).exceptionally(e -> {
-                    log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], "
-                            + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
-                    return null;
-                });
+        lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
+            if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
+                return lowWaterMark;
+            } else {
+                return oldLowWaterMark;
+            }
+        });
+
+        if (handleLowWaterMark.tryAcquire()) {
+            if (individualAckOfTransaction != null && !individualAckOfTransaction.isEmpty()) {
+                TxnID firstTxn = individualAckOfTransaction.firstKey();
+                long tCId = firstTxn.getMostSigBits();
+                Long lowWaterMarkOfFirstTxnId = lowWaterMarks.get(tCId);
+                if (lowWaterMarkOfFirstTxnId != null && firstTxn.getLeastSigBits() <= lowWaterMarkOfFirstTxnId) {
+                    abortTxn(firstTxn, null, lowWaterMarkOfFirstTxnId).thenRun(() -> {
+                        log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], "
+                                + "lowWaterMark : [{}]", topicName, firstTxn, lowWaterMarkOfFirstTxnId);
+                        handleLowWaterMark.release();
+                    }).exceptionally(ex -> {
+                        log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], "
+                                + "lowWaterMark : [{}]", topicName, firstTxn, lowWaterMarkOfFirstTxnId);
+                        handleLowWaterMark.release();
+                        return null;
+                    });
+                    return;
+                }
             }
+            handleLowWaterMark.release();
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index ba0659892b4..a1ff3e7d34a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -37,7 +37,9 @@ import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -71,7 +73,7 @@ import org.testng.annotations.Test;
 @Test(groups = "broker")
 public class TransactionLowWaterMarkTest extends TransactionTestBase {
 
-    private static final String TOPIC = NAMESPACE1 + "/test-topic";
+    private static final String TOPIC = "persistent://" + NAMESPACE1 + "/test-topic";
 
     @BeforeMethod(alwaysRun = true)
     protected void setup() throws Exception {
@@ -216,7 +218,7 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
             ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
                     (ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>) field
                             .get(getPulsarServiceList().get(i).getBrokerService());
-            CompletableFuture<Optional<Topic>> completableFuture = topics.get("persistent://" + TOPIC);
+            CompletableFuture<Optional<Topic>> completableFuture = topics.get(TOPIC);
             if (completableFuture != null) {
                 Optional<Topic> topic = completableFuture.get();
                 if (topic.isPresent()) {
@@ -327,4 +329,138 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
             // no-op
         }
     }
+
+    @Test
+    public void testLowWaterMarkForDifferentTC() throws Exception {
+        String subName = "sub";
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(TOPIC)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(TOPIC)
+                .subscriptionName(subName)
+                .subscribe();
+
+        Transaction txn1 = pulsarClient.newTransaction()
+                .withTransactionTimeout(500, TimeUnit.SECONDS)
+                .build().get();
+        Transaction txn2 = pulsarClient.newTransaction()
+                .withTransactionTimeout(500, TimeUnit.SECONDS)
+                .build().get();
+        while (txn2.getTxnID().getMostSigBits() == txn1.getTxnID().getMostSigBits()) {
+            txn2 = pulsarClient.newTransaction()
+                    .withTransactionTimeout(500, TimeUnit.SECONDS)
+                    .build().get();
+        }
+        Transaction txn3 = pulsarClient.newTransaction()
+                .withTransactionTimeout(500, TimeUnit.SECONDS)
+                .build().get();
+        while (txn3.getTxnID().getMostSigBits() != txn2.getTxnID().getMostSigBits()) {
+            txn3 = pulsarClient.newTransaction()
+                    .withTransactionTimeout(500, TimeUnit.SECONDS)
+                    .build().get();
+        }
+
+        Transaction txn4 = pulsarClient.newTransaction()
+                .withTransactionTimeout(500, TimeUnit.SECONDS)
+                .build().get();
+        while (txn4.getTxnID().getMostSigBits() != txn1.getTxnID().getMostSigBits()) {
+            txn4 = pulsarClient.newTransaction()
+                    .withTransactionTimeout(500, TimeUnit.SECONDS)
+                    .build().get();
+        }
+
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().send();
+        }
+
+        producer.newMessage(txn1).send();
+        producer.newMessage(txn2).send();
+        producer.newMessage(txn3).send();
+        producer.newMessage(txn4).send();
+
+        Message<byte[]> message1 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message1.getMessageId(), txn1);
+        Message<byte[]> message2 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message2.getMessageId(), txn2);
+        Message<byte[]> message3 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message3.getMessageId(), txn3);
+        Message<byte[]> message4 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message4.getMessageId(), txn4);
+
+        txn1.commit().get();
+        txn2.commit().get();
+
+        Field field = TransactionImpl.class.getDeclaredField("state");
+        field.setAccessible(true);
+        field.set(txn1, TransactionImpl.State.OPEN);
+        field.set(txn2, TransactionImpl.State.OPEN);
+
+        producer.newMessage(txn1).send();
+        producer.newMessage(txn2).send();
+
+        Message<byte[]> message5 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message5.getMessageId(), txn1);
+        Message<byte[]> message6 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message6.getMessageId(), txn2);
+
+        txn3.commit().get();
+        TxnID txnID1 = txn1.getTxnID();
+        TxnID txnID2 = txn2.getTxnID();
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(checkTxnIsOngoingInTP(txnID1, subName));
+            assertTrue(checkTxnIsOngoingInTP(txnID2, subName));
+            assertTrue(checkTxnIsOngoingInTB(txnID1));
+            assertTrue(checkTxnIsOngoingInTB(txnID2));
+        });
+
+        txn4.commit().get();
+
+        Awaitility.await().untilAsserted(() -> {
+            assertFalse(checkTxnIsOngoingInTP(txnID1, subName));
+            assertFalse(checkTxnIsOngoingInTP(txnID2, subName));
+            assertFalse(checkTxnIsOngoingInTB(txnID1));
+            assertFalse(checkTxnIsOngoingInTB(txnID2));
+        });
+    }
+
+    private boolean checkTxnIsOngoingInTP(TxnID txnID, String subName) throws Exception {
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService()
+                .getTopic(TopicName.get(TOPIC).toString(), false)
+                .get().get();
+
+        PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
+
+        Field field1 = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
+        field1.setAccessible(true);
+        PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) field1.get(persistentSubscription);
+
+        Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
+        field2.setAccessible(true);
+        LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>> individualAckOfTransaction =
+                (LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>) field2.get(pendingAckHandle);
+        return individualAckOfTransaction.containsKey(txnID);
+    }
+
+    private boolean checkTxnIsOngoingInTB(TxnID txnID) throws Exception {
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService()
+                .getTopic(TopicName.get(TOPIC).toString(), false)
+                .get().get();
+
+        TopicTransactionBuffer topicTransactionBuffer =
+                (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+        Field field3 = TopicTransactionBuffer.class.getDeclaredField("ongoingTxns");
+        field3.setAccessible(true);
+        LinkedMap<TxnID, PositionImpl> ongoingTxns =
+                (LinkedMap<TxnID, PositionImpl>) field3.get(topicTransactionBuffer);
+        return ongoingTxns.containsKey(txnID);
+
+    }
+
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 96b2b9c14bf..367c63797d2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -542,24 +542,32 @@ public class PendingAckPersistentTest extends TransactionTestBase {
                         .get();
 
         PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
+        Field field1 = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
+        field1.setAccessible(true);
+        PendingAckHandleImpl oldPendingAckHandle = (PendingAckHandleImpl) field1.get(persistentSubscription);
+        Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
+        field2.setAccessible(true);
+        LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>> oldIndividualAckOfTransaction =
+                (LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>) field2.get(oldPendingAckHandle);
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(oldIndividualAckOfTransaction.size(), 0));
+
         PendingAckHandleImpl pendingAckHandle = new PendingAckHandleImpl(persistentSubscription);
 
         Method method = PendingAckHandleImpl.class.getDeclaredMethod("initPendingAckStore");
         method.setAccessible(true);
         method.invoke(pendingAckHandle);
 
-        Field field1 = PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
-        field1.setAccessible(true);
-        CompletableFuture<PendingAckStore> completableFuture =
-                (CompletableFuture<PendingAckStore>) field1.get(pendingAckHandle);
+        Field field3 = PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
+        field3.setAccessible(true);
 
         Awaitility.await().until(() -> {
+            CompletableFuture<PendingAckStore> completableFuture =
+                    (CompletableFuture<PendingAckStore>) field3.get(pendingAckHandle);
             completableFuture.get();
             return true;
         });
 
-        Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
-        field2.setAccessible(true);
+
         LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>> individualAckOfTransaction =
                 (LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>) field2.get(pendingAckHandle);