You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2023/02/27 06:26:37 UTC

[pulsar] branch branch-2.9 updated (f8a1a847b7c -> 12329c3c42d)

This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from f8a1a847b7c [fix][broker][branch-2.9] Fix geo-replication admin (#19614)
     new 42261c67a6b [fix][client] Fix reader listener can't auto ack with pooled message. (#19354)
     new 2140ed4a9a8 [fix] [ml] messagesConsumedCounter of NonDurableCursor was initialized incorrectly (#19355)
     new 28290bb70a0 [fix][client] Fix async completion in ConsumerImpl#processPossibleToDLQ (#19392)
     new 12329c3c42d [fix][ml]  Fix potential NPE cause future never complete. (#19415)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../mledger/impl/NonDurableCursorImpl.java         |   2 +-
 .../mledger/impl/NonDurableCursorTest.java         |  15 +++
 .../service/schema/SchemaRegistryServiceImpl.java  |   9 +-
 .../org/apache/pulsar/client/impl/ReaderTest.java  | 113 +++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   1 +
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |   7 +-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |   7 +-
 7 files changed, 146 insertions(+), 8 deletions(-)


[pulsar] 02/04: [fix] [ml] messagesConsumedCounter of NonDurableCursor was initialized incorrectly (#19355)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2140ed4a9a88c1d33080fcd4adf28a32d7d76178
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Tue Jan 31 10:26:06 2023 +0800

    [fix] [ml] messagesConsumedCounter of NonDurableCursor was initialized incorrectly (#19355)
    
    (cherry picked from commit fc9e8bf310185de3685addd439edaee427f532b0)
---
 .../bookkeeper/mledger/impl/NonDurableCursorImpl.java     |  2 +-
 .../bookkeeper/mledger/impl/NonDurableCursorTest.java     | 15 +++++++++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index c74a7299eea..4c096d59430 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -74,7 +74,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
         // Initialize the counter such that the difference between the messages written on the ML and the
         // messagesConsumed is equal to the current backlog (negated).
         if (null != this.readPosition) {
-            long initialBacklog = readPosition.compareTo(lastEntryAndCounter.getLeft()) < 0
+            long initialBacklog = readPosition.compareTo(lastEntryAndCounter.getLeft()) <= 0
                 ? ledger.getNumberOfEntries(Range.closed(readPosition, lastEntryAndCounter.getLeft())) : 0;
             messagesConsumedCounter = lastEntryAndCounter.getRight() - initialBacklog;
         } else {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 18b09793e26..d2bb181c3a5 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -823,5 +823,20 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
         assertEquals(Iterables.size(ledger.getCursors()), 0);
     }
 
+    @Test
+    public void testMessagesConsumedCounterInitializedCorrect() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testMessagesConsumedCounterInitializedCorrect",
+                        new ManagedLedgerConfig().setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
+        Position position = ledger.addEntry("1".getBytes(Encoding));
+        NonDurableCursorImpl cursor = (NonDurableCursorImpl) ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+        cursor.delete(position);
+        assertEquals(cursor.getMessagesConsumedCounter(), 1);
+        assertTrue(cursor.getMessagesConsumedCounter() <= ledger.getEntriesAddedCounter());
+        // cleanup.
+        cursor.close();
+        ledger.close();
+    }
+
+
     private static final Logger log = LoggerFactory.getLogger(NonDurableCursorTest.class);
 }


[pulsar] 03/04: [fix][client] Fix async completion in ConsumerImpl#processPossibleToDLQ (#19392)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 28290bb70a016c07337cff9a608cabb1d2e482bb
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Feb 2 14:07:19 2023 +0200

    [fix][client] Fix async completion in ConsumerImpl#processPossibleToDLQ (#19392)
    
    (cherry picked from commit 39dd1cda2d01a6d472b7a39115a774958a837be1)
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java        | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 89e434d41d4..c4bbdaca797 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1866,6 +1866,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                                     if (ex != null) {
                                         log.warn("[{}] [{}] [{}] Failed to acknowledge the message {} of the original topic but send to the DLQ successfully.",
                                                 topicName, subscription, consumerName, finalMessageId, ex);
+                                        result.complete(false);
                                     } else {
                                         result.complete(true);
                                     }


[pulsar] 01/04: [fix][client] Fix reader listener can't auto ack with pooled message. (#19354)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 42261c67a6bc94fefe6750e667535f79812d0581
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Mon Jan 30 13:42:23 2023 +0800

    [fix][client] Fix reader listener can't auto ack with pooled message. (#19354)
    
    (cherry picked from commit 1b2fa1f2dafc39b5f2bd7e17cc6130066dbc8aad)
---
 .../org/apache/pulsar/client/impl/ReaderTest.java  | 113 +++++++++++++++++++++
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |   7 +-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |   7 +-
 3 files changed, 125 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 53ae8f97008..6289f5ed784 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -55,6 +56,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -621,4 +623,115 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
         Assert.assertEquals(internalStats.cursors.size(), 0);
     }
 
+    @Test
+    public void testReaderListenerAcknowledgement()
+            throws IOException, InterruptedException, PulsarAdminException {
+        // non-partitioned topic
+        final String topic = "persistent://my-property/my-ns/" + UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+        final Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        producer.send("0".getBytes(StandardCharsets.UTF_8));
+        // non-pool
+        final CountDownLatch readerNonPoolLatch = new CountDownLatch(1);
+        final Reader<byte[]> readerNonPool = pulsarClient.newReader()
+                .topic(topic)
+                .subscriptionName("reader-non-pool")
+                .startMessageId(MessageId.earliest)
+                .readerListener((innerReader, message) -> {
+                    // no operation
+                    readerNonPoolLatch.countDown();
+                }).create();
+        readerNonPoolLatch.await();
+        Awaitility.await().untilAsserted(() -> {
+            final PersistentTopicInternalStats internal = admin.topics().getInternalStats(topic);
+            final String lastConfirmedEntry = internal.lastConfirmedEntry;
+            Assert.assertTrue(internal.cursors.containsKey("reader-non-pool"));
+            Assert.assertEquals(internal.cursors.get("reader-non-pool").markDeletePosition, lastConfirmedEntry);
+        });
+        // pooled
+        final CountDownLatch readerPooledLatch = new CountDownLatch(1);
+        final Reader<byte[]> readerPooled = pulsarClient.newReader()
+                .topic(topic)
+                .subscriptionName("reader-pooled")
+                .startMessageId(MessageId.earliest)
+                .poolMessages(true)
+                .readerListener((innerReader, message) -> {
+                    try {
+                        // no operation
+                        readerPooledLatch.countDown();
+                    } finally {
+                        message.release();
+                    }
+                }).create();
+        readerPooledLatch.await();
+        Awaitility.await().untilAsserted(() -> {
+            final PersistentTopicInternalStats internal = admin.topics().getInternalStats(topic);
+            final String lastConfirmedEntry = internal.lastConfirmedEntry;
+            Assert.assertTrue(internal.cursors.containsKey("reader-pooled"));
+            Assert.assertEquals(internal.cursors.get("reader-pooled").markDeletePosition, lastConfirmedEntry);
+        });
+        producer.close();
+        readerNonPool.close();
+        readerPooled.close();
+        admin.topics().delete(topic);
+        // ---- partitioned topic
+        final String partitionedTopic = "persistent://my-property/my-ns/" + UUID.randomUUID();
+        admin.topics().createPartitionedTopic(partitionedTopic, 2);
+        final Producer<byte[]> producer2 = pulsarClient.newProducer()
+                .topic(partitionedTopic)
+                .create();
+        producer2.send("0".getBytes(StandardCharsets.UTF_8));
+        // non-pool
+        final CountDownLatch readerNonPoolLatch2 = new CountDownLatch(1);
+        final Reader<byte[]> readerNonPool2 = pulsarClient.newReader()
+                .topic(partitionedTopic)
+                .subscriptionName("reader-non-pool")
+                .startMessageId(MessageId.earliest)
+                .readerListener((innerReader, message) -> {
+                    // no operation
+                    readerNonPoolLatch2.countDown();
+                }).create();
+        readerNonPoolLatch2.await();
+        Awaitility.await().untilAsserted(() -> {
+            PartitionedTopicInternalStats partitionedInternal =
+                    admin.topics().getPartitionedInternalStats(partitionedTopic);
+            for (PersistentTopicInternalStats internal : partitionedInternal.partitions.values()) {
+                final String lastConfirmedEntry = internal.lastConfirmedEntry;
+                Assert.assertTrue(internal.cursors.containsKey("reader-non-pool"));
+                Assert.assertEquals(internal.cursors.get("reader-non-pool").markDeletePosition, lastConfirmedEntry);
+            }
+        });
+        // pooled
+        final CountDownLatch readerPooledLatch2 = new CountDownLatch(1);
+        final Reader<byte[]> readerPooled2 = pulsarClient.newReader()
+                .topic(partitionedTopic)
+                .subscriptionName("reader-pooled")
+                .startMessageId(MessageId.earliest)
+                .poolMessages(true)
+                .readerListener((innerReader, message) -> {
+                    try {
+                        // no operation
+                        readerPooledLatch2.countDown();
+                    } finally {
+                        message.release();
+                    }
+                }).create();
+        readerPooledLatch2.await();
+        Awaitility.await().untilAsserted(() -> {
+            PartitionedTopicInternalStats partitionedInternal =
+                    admin.topics().getPartitionedInternalStats(partitionedTopic);
+            for (PersistentTopicInternalStats internal : partitionedInternal.partitions.values()) {
+                final String lastConfirmedEntry = internal.lastConfirmedEntry;
+                Assert.assertTrue(internal.cursors.containsKey("reader-pooled"));
+                Assert.assertEquals(internal.cursors.get("reader-pooled").markDeletePosition, lastConfirmedEntry);
+            }
+        });
+        producer2.close();
+        readerNonPool2.close();
+        readerPooled2.close();
+        admin.topics().deletePartitionedTopic(partitionedTopic);
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index b656c005db9..0d4dd87d2c2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -73,8 +73,13 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
 
                 @Override
                 public void received(Consumer<T> consumer, Message<T> msg) {
+                    final MessageId messageId = msg.getMessageId();
                     readerListener.received(MultiTopicsReaderImpl.this, msg);
-                    consumer.acknowledgeCumulativeAsync(msg);
+                    consumer.acknowledgeCumulativeAsync(messageId).exceptionally(ex -> {
+                        log.error("[{}][{}] auto acknowledge message {} cumulative fail.", getTopic(),
+                                getMultiTopicsConsumer().getSubscription(), messageId, ex);
+                        return null;
+                    });
                 }
 
                 @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 72c74e8875d..64ee3c9c8b5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -90,8 +90,13 @@ public class ReaderImpl<T> implements Reader<T> {
 
                 @Override
                 public void received(Consumer<T> consumer, Message<T> msg) {
+                    final MessageId messageId = msg.getMessageId();
                     readerListener.received(ReaderImpl.this, msg);
-                    consumer.acknowledgeCumulativeAsync(msg);
+                    consumer.acknowledgeCumulativeAsync(messageId).exceptionally(ex -> {
+                        log.error("[{}][{}] auto acknowledge message {} cumulative fail.", getTopic(),
+                                getConsumer().getSubscription(), messageId, ex);
+                        return null;
+                    });
                 }
 
                 @Override


[pulsar] 04/04: [fix][ml] Fix potential NPE cause future never complete. (#19415)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 12329c3c42d2f2605a9a74991f93b415c8bd18e1
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Fri Feb 3 21:57:09 2023 +0800

    [fix][ml]  Fix potential NPE cause future never complete. (#19415)
    
    (cherry picked from commit 11073fd4dcb92c61fa2a2641f07c800e940cb319)
---
 .../pulsar/broker/service/schema/SchemaRegistryServiceImpl.java  | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 32e9c56a670..9cfcce419fd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -455,12 +455,11 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
         schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList, ex) -> {
             List<SchemaAndMetadata> list = ex != null ? new ArrayList<>() : schemaList;
             if (ex != null) {
-                boolean recoverable = ex.getCause() != null && (ex.getCause() instanceof SchemaException)
-                        ? ((SchemaException) ex.getCause()).isRecoverable()
-                        : true;
+                final Throwable rc = FutureUtil.unwrapCompletionException(ex);
+                boolean recoverable = !(rc instanceof SchemaException) || ((SchemaException) rc).isRecoverable();
                 // if error is recoverable then fail the request.
                 if (recoverable) {
-                    schemaResult.completeExceptionally(ex.getCause());
+                    schemaResult.completeExceptionally(rc);
                     return null;
                 }
                 // clean the schema list for recoverable and delete the schema from zk
@@ -473,7 +472,7 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
                 trimDeletedSchemaAndGetList(list);
                 // clean up the broken schema from zk
                 deleteSchemaStorage(schemaId, true).handle((sv, th) -> {
-                    log.info("Clean up non-recoverable schema {}. Deletion of schema {} {}", ex.getCause().getMessage(),
+                    log.info("Clean up non-recoverable schema {}. Deletion of schema {} {}", rc.getMessage(),
                             schemaId, (th == null ? "successful" : "failed, " + th.getCause().getMessage()));
                     schemaResult.complete(list);
                     return null;