You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/29 05:17:14 UTC

[pulsar] branch branch-2.8 updated (3b164f5ac0c -> f45985e7bff)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 3b164f5ac0c Revert "[improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)"
     new 739c45e8075 [fix][Java Client] Fix thread safety issue of `LastCumulativeAck` (#16072)
     new 7a3ed537f38 Fix `messageQueue` release message issue. (#16155)
     new b607d09176f [fix][broker] Fix NPE when drop backlog for time limit. (#16235)
     new a97131eeabf [Branch-2.9][Cherry-pick] fix bug: fail to expose managed ledger client stats to prometheus if bookkeeperClientExposeStatsToPrometheus is true #16219 (#16343)
     new f45985e7bff Increase timeout in PersistentStreamingDispatcherBlockConsumerTest.testBlockBrokerDispatching (#12943)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pulsar/broker/ManagedLedgerClientFactory.java  |   5 +-
 .../pulsar/broker/service/BacklogQuotaManager.java |   4 +
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  32 +++++
 .../client/api/DispatcherBlockConsumerTest.java    |   2 +-
 .../PersistentAcknowledgmentsGroupingTracker.java  | 141 +++++++++++----------
 .../pulsar/client/impl/LastCumulativeAckTest.java  |  86 +++++++++++++
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  14 +-
 7 files changed, 205 insertions(+), 79 deletions(-)
 create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java


[pulsar] 04/05: [Branch-2.9][Cherry-pick] fix bug: fail to expose managed ledger client stats to prometheus if bookkeeperClientExposeStatsToPrometheus is true #16219 (#16343)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a97131eeabf2febe167d775b4c8de70946f44c5c
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Sat Jul 2 21:45:46 2022 +0800

    [Branch-2.9][Cherry-pick] fix bug: fail to expose managed ledger client stats to prometheus if bookkeeperClientExposeStatsToPrometheus is true #16219 (#16343)
    
    (cherry picked from commit d56ae3797c18b1451b58c4b05f60ed23ae321c9f)
---
 .../pulsar/broker/ManagedLedgerClientFactory.java  |  5 ++--
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 32 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 431cb729b22..dc0ac3c5b7b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -82,7 +82,8 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
         statsProvider.start(configuration);
         StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_managedLedger_client");
 
-        this.defaultBkClient = bookkeeperProvider.create(conf, zkClient, eventLoopGroup, Optional.empty(), null);
+        this.defaultBkClient =
+                bookkeeperProvider.create(conf, zkClient, eventLoopGroup, Optional.empty(), null, statsLogger);
 
         BookkeeperFactoryForCustomEnsemblePlacementPolicy bkFactory = (
                 EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) -> {
@@ -93,7 +94,7 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
                     try {
                         return bookkeeperProvider.create(conf, zkClient, eventLoopGroup,
                                 Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()),
-                                ensemblePlacementPolicyConfig.getProperties());
+                                ensemblePlacementPolicyConfig.getProperties(), statsLogger);
                     } catch (Exception e) {
                         log.error("Failed to initialize bk-client for policy {}, properties {}",
                                 ensemblePlacementPolicyConfig.getPolicyClass(),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 68dde0aaa26..c0383cd6e9f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -821,6 +821,38 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 
         cm = (List<Metric>) metrics.get("pulsar_managedLedger_client_bookkeeper_ml_workers_task_execution_count");
         assertEquals(cm.size(), 0);
+
+        cm = (List<Metric>) metrics.get(
+                keyNameBySubstrings(metrics, "pulsar_managedLedger_client", "bookkeeper_ml_scheduler_total_tasks"));
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+        cm = (List<Metric>) metrics.get(keyNameBySubstrings(metrics, "pulsar_managedLedger_client",
+                "bookkeeper_ml_scheduler_task_execution_sum"));
+        assertEquals(cm.size(), 2);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+        cm = (List<Metric>) metrics.get(
+                keyNameBySubstrings(metrics,
+                        "pulsar_managedLedger_client", "bookkeeper_ml_scheduler_queue"));
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+    }
+
+    private static String keyNameBySubstrings(Multimap<String, Metric> metrics, String... substrings) {
+        for (String key : metrics.keys()) {
+            boolean found = true;
+            for (String s : substrings) {
+                if (!key.contains(s)) {
+                    found = false;
+                    break;
+                }
+            }
+            if (found) {
+                return key;
+            }
+        }
+        return null;
     }
 
     @Test


[pulsar] 01/05: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck` (#16072)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 739c45e807524e003286db66563eb8e4421c89dd
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Jun 22 23:34:49 2022 +0800

    [fix][Java Client] Fix thread safety issue of `LastCumulativeAck` (#16072)
    
    ### Motivation
    
    There were several issues caused by the thread safe issue of
    `LastCumulativeAck`, see:
    - https://github.com/apache/pulsar/pull/10586
    - https://github.com/apache/pulsar/pull/12343
    
    The root cause is that `LastCumulativeAck` could be accessed by
    different threads, especially in `flushAsync` method. But the fields are
    accessed directly and no thread safety can be guaranteed.
    
    In addition, the current `LastCumulativeAck` class  was added in
    https://github.com/apache/pulsar/pull/8996 to hold two object
    references, but this modification is wrong.
    
    Before #8996, there are two CAS operations in `doCumulativeAck` method
    in case it's called concurretly. Though the composite CAS operation is
    not atomic.
    
    However, after #8996, only CAS operation was performed but it's compared
    with a `LastCumulativeAck` object, not the two fields (`messageId` and
    `bitSetRecyclable`).
    
    There is another issue that it uses a flag `cumulativeAckFlushRequired`
    to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
    was called concurrently, both would send ACK commands to broker.
    
    ### Modifications
    
    To solve the thread safety issue, this PR move the `LastCumulativeAck`
    out of the `PersistentAcknowledgmentsGroupingTracker` to disable
    directly access to the internal fields. Then, the following synchronized
    methods were added to guarantee the thread safety:
    - `update`: Guarantee the safe write operations. It also recycles the
      `BitSetRecyclable` object before assigning new values and indicates
      itself can be flushed.
    - `flush`: If it can be flushed, return a thread local
      `LastCumulativeAck` instance that contains the message ID and the bit
      set. The bit set is deep copied to avoid the original reference being
      recycled in another `update` call.
    
    In addition, since the `messageId` field is volatile, the `getMessageId`
    method can always retrieve the latest reference.
    
    `LastCumulativeAckTest` is added to verify the sematics above.
    
    Based on the new design, we can only maintain a `LastCumulativeAck`
    field in `PersistentAcknowledgmentsGroupingTracker` and call the related
    methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
    that two concurrent `flushAsync` calls might send the same ACK command
    twice.
    
    (cherry picked from commit 936d6fdc780ea454e72e82b6c7a1885799158d02)
---
 .../PersistentAcknowledgmentsGroupingTracker.java  | 141 +++++++++++----------
 .../pulsar/client/impl/LastCumulativeAckTest.java  |  86 +++++++++++++
 2 files changed, 159 insertions(+), 68 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 9ba965d37cc..9bd1c3d2a56 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.FastThreadLocal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -33,11 +34,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import io.netty.util.Recycler;
-import lombok.NonNull;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.client.api.MessageId;
@@ -69,17 +67,11 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
     private volatile TimedCompletableFuture<Void> currentIndividualAckFuture;
     private volatile TimedCompletableFuture<Void> currentCumulativeAckFuture;
 
-    private volatile LastCumulativeAck lastCumulativeAck =
-            LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
-
-    private volatile boolean cumulativeAckFlushRequired = false;
+    private final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
 
     // When we flush the command, we should ensure current ack request will send correct
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
-    private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, LastCumulativeAck> LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater
-            .newUpdater(PersistentAcknowledgmentsGroupingTracker.class, LastCumulativeAck.class, "lastCumulativeAck");
-
     /**
      * This is a set of all the individual acks that the application has issued and that were not already sent to
      * broker.
@@ -115,13 +107,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
      * resent after a disconnection and for which the user has already sent an acknowledgement.
      */
     @Override
-    public boolean isDuplicate(@NonNull MessageId messageId) {
-        final MessageId messageIdOfLastAck = lastCumulativeAck.messageId;
+    public boolean isDuplicate(MessageId messageId) {
+        final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();
         if (messageIdOfLastAck != null && messageId.compareTo(messageIdOfLastAck) <= 0) {
             // Already included in a cumulative ack
             return true;
         } else {
-            return pendingIndividualAcks.contains(messageId);
+            return pendingIndividualAcks.contains((MessageIdImpl) messageId);
         }
     }
 
@@ -369,30 +361,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
 
     private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {
         // Handle concurrent updates from different threads
-        LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet);
-        while (true) {
-            LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;
-            if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {
-                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) {
-                    if (lastCumulativeAck.bitSetRecyclable != null) {
-                        try {
-                            lastCumulativeAck.bitSetRecyclable.recycle();
-                        } catch (Exception ignore) {
-                            // no-op
-                        }
-                        lastCumulativeAck.bitSetRecyclable = null;
-                    }
-                    lastCumulativeAck.recycle();
-                    // Successfully updated the last cumulative ack. Next flush iteration will send this to broker.
-                    cumulativeAckFlushRequired = true;
-                    return;
-                }
-            } else {
-                currentCumulativeAck.recycle();
-                // message id acknowledging an before the current last cumulative ack
-                return;
-            }
-        }
+        lastCumulativeAck.update(msgId, bitSet);
     }
 
     private CompletableFuture<Void> doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId,
@@ -473,15 +442,15 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
     }
 
     private void flushAsync(ClientCnx cnx) {
+        final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush();
         boolean shouldFlush = false;
-        if (cumulativeAckFlushRequired) {
-            newMessageAckCommandAndWrite(cnx, consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
-                    lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
-                    AckType.Cumulative, null, Collections.emptyMap(), false,
-                    this.currentCumulativeAckFuture, null);
-            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId);
+        if (lastCumulativeAckToFlush != null) {
             shouldFlush = true;
-            cumulativeAckFlushRequired = false;
+            final MessageIdImpl messageId = lastCumulativeAckToFlush.getMessageId();
+            newMessageAckCommandAndWrite(cnx, consumer.consumerId, messageId.getLedgerId(), messageId.getEntryId(),
+                    lastCumulativeAckToFlush.getBitSetRecyclable(), AckType.Cumulative, null,
+                    Collections.emptyMap(), false, this.currentCumulativeAckFuture, null);
+            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId);
         }
 
         // Flush all individual acks
@@ -555,7 +524,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
     @Override
     public void flushAndClean() {
         flush();
-        lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
+        lastCumulativeAck.reset();
         pendingIndividualAcks.clear();
     }
 
@@ -658,36 +627,72 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
         return ackReceiptEnabled && cnx != null
                 && Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
     }
+}
 
-    private static class LastCumulativeAck {
-        private MessageIdImpl messageId;
-        private BitSetRecyclable bitSetRecyclable;
+@Getter
+class LastCumulativeAck {
 
-        static LastCumulativeAck create(MessageIdImpl messageId, BitSetRecyclable bitSetRecyclable) {
-            LastCumulativeAck op = RECYCLER.get();
-            op.messageId = messageId;
-            op.bitSetRecyclable = bitSetRecyclable;
-            return op;
-        }
+    // It's used as a returned value by `flush()` to avoid creating a new instance each time `flush()` is called
+    public static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK =
+            new FastThreadLocal<LastCumulativeAck>() {
 
-        private LastCumulativeAck(Recycler.Handle<LastCumulativeAck> recyclerHandle) {
-            this.recyclerHandle = recyclerHandle;
-        }
+                @Override
+                protected LastCumulativeAck initialValue() {
+                    return new LastCumulativeAck();
+                }
+            };
+    public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) MessageIdImpl.earliest;
 
-        void recycle() {
-            if (bitSetRecyclable != null) {
+    private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID;
+    private BitSetRecyclable bitSetRecyclable = null;
+    private boolean flushRequired = false;
+
+    public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
+        if (messageId.compareTo(this.messageId) > 0) {
+            if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
                 this.bitSetRecyclable.recycle();
             }
-            this.messageId = null;
-            recyclerHandle.recycle(this);
+            set(messageId, bitSetRecyclable);
+            flushRequired = true;
         }
+    }
 
-        private final Recycler.Handle<LastCumulativeAck> recyclerHandle;
-        private static final Recycler<LastCumulativeAck> RECYCLER = new Recycler<LastCumulativeAck>() {
-            @Override
-            protected LastCumulativeAck newObject(Handle<LastCumulativeAck> handle) {
-                return new LastCumulativeAck(handle);
+    public synchronized LastCumulativeAck flush() {
+        if (flushRequired) {
+            final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get();
+            if (bitSetRecyclable != null) {
+                localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray()));
+            } else {
+                localLastCumulativeAck.set(this.messageId, null);
             }
-        };
+            flushRequired = false;
+            return localLastCumulativeAck;
+        } else {
+            // Return null to indicate nothing to be flushed
+            return null;
+        }
+    }
+
+    public synchronized void reset() {
+        if (bitSetRecyclable != null) {
+            bitSetRecyclable.recycle();
+        }
+        messageId = DEFAULT_MESSAGE_ID;
+        bitSetRecyclable = null;
+        flushRequired = false;
+    }
+
+    private synchronized void set(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
+        this.messageId = messageId;
+        this.bitSetRecyclable = bitSetRecyclable;
+    }
+
+    @Override
+    public String toString() {
+        String s = messageId.toString();
+        if (bitSetRecyclable != null) {
+            s += " (bit set: " + bitSetRecyclable + ")";
+        }
+        return s;
     }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
new file mode 100644
index 00000000000..102ccfc0e07
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.testng.annotations.Test;
+
+public class LastCumulativeAckTest {
+
+    @Test
+    public void testUpdate() {
+        final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
+        assertFalse(lastCumulativeAck.isFlushRequired());
+        assertEquals(lastCumulativeAck.getMessageId(), LastCumulativeAck.DEFAULT_MESSAGE_ID);
+        assertNull(lastCumulativeAck.getBitSetRecyclable());
+
+        final MessageIdImpl messageId1 = new MessageIdImpl(0L, 1L, 10);
+        final BitSetRecyclable bitSetRecyclable1 = BitSetRecyclable.create();
+        bitSetRecyclable1.set(0, 3);
+        lastCumulativeAck.update(messageId1, bitSetRecyclable1);
+        assertTrue(lastCumulativeAck.isFlushRequired());
+        assertSame(lastCumulativeAck.getMessageId(), messageId1);
+        assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1);
+
+        final MessageIdImpl messageId2 = new MessageIdImpl(0L, 2L, 8);
+        lastCumulativeAck.update(messageId2, bitSetRecyclable1);
+        // bitSetRecyclable1 is not recycled
+        assertEquals(bitSetRecyclable1.toString(), "{0, 1, 2}");
+
+        final BitSetRecyclable bitSetRecyclable2 = BitSetRecyclable.create();
+        bitSetRecyclable2.set(0, 2);
+
+        // `update()` only accepts a newer message ID, so this call here has no side effect
+        lastCumulativeAck.update(messageId2, bitSetRecyclable2);
+        assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1);
+
+        final MessageIdImpl messageId3 = new MessageIdImpl(0L, 3L, 9);
+        lastCumulativeAck.update(messageId3, bitSetRecyclable2);
+        // bitSetRecyclable1 is recycled because it's replaced in `update`
+        assertEquals(bitSetRecyclable1.toString(), "{}");
+        assertSame(lastCumulativeAck.getMessageId(), messageId3);
+        assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable2);
+        bitSetRecyclable2.recycle();
+    }
+
+    @Test
+    public void testFlush() {
+        final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
+        assertNull(lastCumulativeAck.flush());
+
+        final MessageIdImpl messageId = new MessageIdImpl(0L, 1L, 3);
+        final BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
+        bitSetRecyclable.set(0, 3);
+        lastCumulativeAck.update(messageId, bitSetRecyclable);
+        assertTrue(lastCumulativeAck.isFlushRequired());
+
+        final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush();
+        assertFalse(lastCumulativeAck.isFlushRequired());
+        assertSame(lastCumulativeAckToFlush.getMessageId(), messageId);
+        assertNotSame(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
+        assertEquals(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
+    }
+
+}


[pulsar] 02/05: Fix `messageQueue` release message issue. (#16155)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7a3ed537f383f87e96d263c95a3e2ffd32272601
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Jun 21 17:13:00 2022 +0800

    Fix `messageQueue` release message issue. (#16155)
    
    (cherry picked from commit 141c44022a27be2fc07eab9827cfdb168e448953)
---
 .../org/apache/pulsar/sql/presto/PulsarRecordCursor.java   | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index e646dcff4c9..9474376f77c 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -700,14 +700,6 @@ public class PulsarRecordCursor implements RecordCursor {
     public void close() {
         log.info("Closing cursor record");
 
-        if (currentMessage != null) {
-            currentMessage.release();
-        }
-
-        if (messageQueue != null) {
-            messageQueue.drain(RawMessage::release);
-        }
-
         if (entryQueue != null) {
             entryQueue.drain(Entry::release);
         }
@@ -717,6 +709,12 @@ public class PulsarRecordCursor implements RecordCursor {
                 if (entryQueue != null) {
                     entryQueue.drain(Entry::release);
                 }
+                if (messageQueue != null) {
+                    messageQueue.drain(RawMessage::release);
+                }
+                if (currentMessage != null) {
+                    currentMessage.release();
+                }
             });
         }
 


[pulsar] 03/05: [fix][broker] Fix NPE when drop backlog for time limit. (#16235)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b607d09176fd2c1586691de2bde5967ebaf4f539
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Jun 28 14:39:00 2022 +0800

    [fix][broker] Fix NPE when drop backlog for time limit. (#16235)
    
    (cherry picked from commit d24d82780fd27a98c6cdbee28d756ee7d419495f)
---
 .../java/org/apache/pulsar/broker/service/BacklogQuotaManager.java    | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 65ffaabef96..6414904d73d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -246,6 +246,10 @@ public class BacklogQuotaManager {
                     ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
                     Position oldestPosition = slowestConsumer.getMarkDeletedPosition();
                     ManagedLedgerInfo.LedgerInfo ledgerInfo = mLedger.getLedgerInfo(oldestPosition.getLedgerId()).get();
+                    if (ledgerInfo == null) {
+                        slowestConsumer.resetCursor(mLedger.getNextValidPosition((PositionImpl) oldestPosition));
+                        continue;
+                    }
                     // Timestamp only > 0 if ledger has been closed
                     if (ledgerInfo.getTimestamp() > 0
                             && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) {


[pulsar] 05/05: Increase timeout in PersistentStreamingDispatcherBlockConsumerTest.testBlockBrokerDispatching (#12943)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f45985e7bfffaee365af268bec6925953b7db5b0
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Tue Nov 23 22:17:16 2021 +0800

    Increase timeout in PersistentStreamingDispatcherBlockConsumerTest.testBlockBrokerDispatching (#12943)
    
    (cherry picked from commit 5abf42cf24b43f7f1874eb0b385e093926ea0b21)
---
 .../java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index cb1f44bf81e..4faf7834e94 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -672,7 +672,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
      * </pre>
      *
      */
-    @Test(timeOut = 10000)
+    @Test(timeOut = 60000)
     public void testBlockBrokerDispatching() {
         log.info("-- Starting {} test --", methodName);