You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/04 12:54:00 UTC

[pulsar] branch branch-2.10 updated (66cb1c4ee50 -> 17b84b67ece)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 66cb1c4ee50 [improve][broker]Remove unnecessary lock on the stats thread  (#16983)
     new fd8e5352173 [fix][broker] fix broker unackmessages number reduce error (#17003)
     new a0eb84ef194 [fix][txn] fix ack with txn compute ackedCount error (#17016)
     new b0939dc1366 [fix][client] Release semaphore before discarding messages in batchMessageContainer (#17019)
     new 17b84b67ece [fix][tiered-storage] move the state check forward (#17020)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pulsar/broker/service/Consumer.java | 38 +++++-----
 .../BatchMessageWithBatchIndexLevelTest.java       | 82 ++++++++++++++++++++++
 .../client/impl/TransactionEndToEndTest.java       | 46 ++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    |  2 +-
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java |  7 ++
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  | 23 ++++++
 6 files changed, 180 insertions(+), 18 deletions(-)


[pulsar] 04/04: [fix][tiered-storage] move the state check forward (#17020)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 17b84b67ece2f661b910d3f6c48266b37a1b24c9
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Aug 11 21:01:55 2022 +0800

    [fix][tiered-storage] move the state check forward (#17020)
    
    * [fix][tiered-storage] move the state check forward
    ---
    
    *Motivation*
    
    Move the close check forward to avoid `getLastAddConfirmed()` get
    an NPE.
    If the state is closed. That means the resource is closed and the
    `OffloadIndexBlock` has been recycled. Which will cause an NPE when
    `getLastAddCOnfirmed()`.
    
    (cherry picked from commit ee0ea3a6f9ffb42d4ec129eb689d3c1059e5f4a8)
---
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java |  7 +++++++
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  | 23 ++++++++++++++++++++++
 2 files changed, 30 insertions(+)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index d60ea1407ca..fae534454e0 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -106,6 +106,11 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
             List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
             boolean seeked = false;
             try {
+                if (state == State.Closed) {
+                    log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
+                        ledgerId, firstEntry, lastEntry);
+                    throw new BKException.BKUnexpectedConditionException();
+                }
                 if (firstEntry > lastEntry
                     || firstEntry < 0
                     || lastEntry > getLastAddConfirmed()) {
@@ -173,6 +178,8 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
 
                 promise.complete(LedgerEntriesImpl.create(entries));
             } catch (Throwable t) {
+                log.error("Failed to read entries {} - {} from the offloader in ledger {}",
+                    firstEntry, lastEntry, ledgerId, t);
                 promise.completeExceptionally(t);
                 entries.forEach(LedgerEntry::close);
             }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 77dfc55b777..ab979f8a5a1 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -496,4 +496,27 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
             fail("Get unexpected exception when reading entries", e);
         }
     }
+
+    @Test
+    public void testReadWithAClosedLedgerHandler() throws Exception {
+        ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
+        LedgerOffloader offloader = getOffloader();
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get();
+        Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
+        long lac = toTest.getLastAddConfirmed();
+        toTest.readAsync(0, lac).get();
+        toTest.closeAsync().get();
+        try {
+            toTest.readAsync(0, lac).get();
+        } catch (Exception e) {
+            if (e.getCause() instanceof BKException.BKUnexpectedConditionException) {
+                // expected exception
+                return;
+            }
+            throw e;
+        }
+    }
 }


[pulsar] 01/04: [fix][broker] fix broker unackmessages number reduce error (#17003)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fd8e53521730563206f600fdc785e6c4245636d9
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sat Aug 13 10:29:26 2022 +0800

    [fix][broker] fix broker unackmessages number reduce error (#17003)
    
    (cherry picked from commit 5262e6c8b4d2a98ac7f73a94a30f001630b2be28)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 13 ++--
 .../BatchMessageWithBatchIndexLevelTest.java       | 82 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index bf5b4085561..14faddf9d63 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -442,7 +442,7 @@ public class Consumer {
                     ackSets[j] = msgId.getAckSetAt(j);
                 }
                 position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets);
-                ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets);
+                ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets, ackOwnerConsumer);
                 if (isTransactionEnabled()) {
                     //sync the batch position bit set point, in order to delete the position in pending acks
                     if (Subscription.isIndividualAckMode(subType)) {
@@ -452,7 +452,7 @@ public class Consumer {
                 }
             } else {
                 position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
-                ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position);
+                ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer);
             }
 
             addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
@@ -560,20 +560,21 @@ public class Consumer {
         return batchSize;
     }
 
-    private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position) {
+    private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position, Consumer consumer) {
         if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) {
             long[] cursorAckSet = getCursorAckSet(position);
             if (cursorAckSet != null) {
-                return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET);
+                return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET, consumer);
             }
         }
         return batchSize;
     }
 
-    private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets) {
+    private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets,
+                                                        Consumer consumer) {
         long ackedCount = 0;
         if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)
-            && pendingAcks.get(position.getLedgerId(), position.getEntryId()) != null) {
+            && consumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId()) != null) {
             long[] cursorAckSet = getCursorAckSet(position);
             if (cursorAckSet != null) {
                 BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index b953772ad67..d5c4e1eb064 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -33,11 +33,13 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 
 @Test(groups = "broker")
 public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
@@ -195,4 +197,84 @@ public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
             assertEquals(unackedMessages, 10);
         });
     }
+
+    @Test
+    public void testAckMessageWithNotOwnerConsumerUnAckMessageCount() throws Exception {
+        final String subName = "test";
+        final String topicName = "persistent://prop/ns-abc/testAckMessageWithNotOwnerConsumerUnAckMessageCount-"
+                + UUID.randomUUID();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(topicName)
+                .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+                .enableBatching(true)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer1 = pulsarClient
+                .newConsumer()
+                .topic(topicName)
+                .consumerName("consumer-1")
+                .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+                .isAckReceiptEnabled(true)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        @Cleanup
+        Consumer<byte[]> consumer2 = pulsarClient
+                .newConsumer()
+                .topic(topicName)
+                .consumerName("consumer-2")
+                .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+                .isAckReceiptEnabled(true)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().value(("Hello Pulsar - " + i).getBytes()).sendAsync();
+        }
+
+        // consume-1 receive 5 batch messages
+        List<MessageId> list = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            list.add(consumer1.receive().getMessageId());
+        }
+
+        // consumer-1 redeliver the batch messages
+        consumer1.negativeAcknowledge(list.get(0));
+
+        // consumer-2 will receive the messages that the consumer-1 redelivered
+        for (int i = 0; i < 5; i++) {
+            consumer2.receive().getMessageId();
+        }
+
+        // consumer1 ack two messages in the batch message
+        consumer1.acknowledge(list.get(1));
+        consumer1.acknowledge(list.get(2));
+
+        // consumer-2 redeliver the rest of the messages
+        consumer2.negativeAcknowledge(list.get(1));
+
+        // consume-1 close will redeliver the rest messages to consumer-2
+        consumer1.close();
+
+        // consumer-2 can receive the rest of 3 messages
+        for (int i = 0; i < 3; i++) {
+            consumer2.acknowledge(consumer2.receive().getMessageId());
+        }
+
+        // consumer-2 can't receive any messages, all the messages in batch has been acked
+        Message<byte[]> message = consumer2.receive(1, TimeUnit.SECONDS);
+        assertNull(message);
+
+        // the number of consumer-2's unacked messages is 0
+        Awaitility.await().until(() -> getPulsar().getBrokerService().getTopic(topicName, false)
+                .get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages() == 0);
+    }
 }


[pulsar] 03/04: [fix][client] Release semaphore before discarding messages in batchMessageContainer (#17019)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b0939dc136653243404815f71131d299ad408aa4
Author: Zike Yang <zi...@apache.org>
AuthorDate: Thu Aug 11 09:24:30 2022 +0800

    [fix][client] Release semaphore before discarding messages in batchMessageContainer (#17019)
    
    (cherry picked from commit 21dc668e2ca47c5e63c367be7b8d904ae05f6865)
---
 .../src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java       | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 133a4eca0cd..6a509166e8a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1957,9 +1957,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         }
         final int numMessagesInBatch = batchMessageContainer.getNumMessagesInBatch();
         final long currentBatchSize = batchMessageContainer.getCurrentBatchSize();
-        batchMessageContainer.discard(ex);
         semaphoreRelease(numMessagesInBatch);
         client.getMemoryLimitController().releaseMemory(currentBatchSize);
+        batchMessageContainer.discard(ex);
     }
 
     @Override


[pulsar] 02/04: [fix][txn] fix ack with txn compute ackedCount error (#17016)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a0eb84ef1947d0fbb0594b393558162529c47828
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Sep 1 12:29:04 2022 +0800

    [fix][txn] fix ack with txn compute ackedCount error (#17016)
    
    Co-authored-by: congbobo184 <co...@github.com>
    (cherry picked from commit 176b0d6e9e0d647c611cfdd359e5088ccb58788c)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 25 ++++++------
 .../client/impl/TransactionEndToEndTest.java       | 46 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 14faddf9d63..af7bfed4e7f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -496,25 +496,28 @@ public class Consumer {
         LongAdder totalAckCount = new LongAdder();
         for (int i = 0; i < ack.getMessageIdsCount(); i++) {
             MessageIdData msgId = ack.getMessageIdAt(i);
-            PositionImpl position;
+            PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
+            // acked count at least one
             long ackedCount = 0;
-            long batchSize = getBatchSize(msgId);
+            long batchSize = 0;
+            if (msgId.hasBatchSize()) {
+                batchSize = msgId.getBatchSize();
+                // ack batch messages set ackeCount = batchSize
+                ackedCount = msgId.getBatchSize();
+                positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
+            } else {
+                // ack no batch message set ackedCount = 1
+                ackedCount = 1;
+                positionsAcked.add(new MutablePair<>(position, (int) batchSize));
+            }
             Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
             if (msgId.getAckSetsCount() > 0) {
                 long[] ackSets = new long[msgId.getAckSetsCount()];
                 for (int j = 0; j < msgId.getAckSetsCount(); j++) {
                     ackSets[j] = msgId.getAckSetAt(j);
                 }
-                position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets);
+                position.setAckSet(ackSets);
                 ackedCount = getAckedCountForTransactionAck(batchSize, ackSets);
-            } else {
-                position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
-                ackedCount = batchSize;
-            }
-            if (msgId.hasBatchSize()) {
-                positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
-            } else {
-                positionsAcked.add(new MutablePair<>(position, (int) batchSize));
             }
 
             addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 273013d6f51..7ff26ddd309 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1108,6 +1108,52 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         }
     }
 
+    @Test
+    public void testAckWithTransactionReduceUnackCountNotInPendingAcks() throws Exception {
+        final String topic = "persistent://" + NAMESPACE1 + "/testAckWithTransactionReduceUnackCountNotInPendingAcks";
+        final String subName = "test";
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+                .sendTimeout(1, TimeUnit.SECONDS)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(subName)
+                .subscribe();
+
+        // send 5 messages with one batch
+        for (int i = 0; i < 5; i++) {
+            producer.sendAsync((i + "").getBytes(UTF_8));
+        }
+
+        List<MessageId> messageIds = new ArrayList<>();
+
+        // receive the batch messages add to a list
+        for (int i = 0; i < 5; i++) {
+            messageIds.add(consumer.receive().getMessageId());
+        }
+
+        MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0);
+
+
+        // remove the message from the pendingAcks, in fact redeliver will remove the messageId from the pendingAck
+        getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
+                .get().get().getSubscription(subName).getConsumers().get(0).getPendingAcks()
+                .remove(messageId.ledgerId, messageId.entryId);
+
+        Transaction txn = getTxn();
+        consumer.acknowledgeAsync(messageIds.get(1), txn).get();
+
+        // ack one message, the unack count is 4
+        assertEquals(getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
+                .get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages(), 4);
+    }
+
     @Test
     public void testSendTxnAckMessageToDLQ() throws Exception {
         String topic = NAMESPACE1 + "/testSendTxnAckMessageToDLQ";