You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/09/01 03:54:48 UTC

[pulsar] branch master updated: support use `BitSet` generate the `BatchMessageAcker` (#7909)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 81202e1  support use `BitSet` generate the `BatchMessageAcker` (#7909)
81202e1 is described below

commit 81202e147f9460e3ab60d82fe0ad55510a258ed6
Author: ran <ga...@126.com>
AuthorDate: Tue Sep 1 11:54:21 2020 +0800

    support use `BitSet` generate the `BatchMessageAcker` (#7909)
    
    Motivation
    Currently, we have to know the batchSize to generate BatchMessageAcker. If we could get the batch index ack bitSet from Broker we could generate the BatchMessageAcker by the bitSet, this is useful for consuming transaction messages, we don't need to change the protocol to get the total message number of one transaction.
    
    Modifications
    Add a new static method to generate the BatchMessageAcker by BitSet.
---
 .../pulsar/client/impl/BatchMessageAcker.java      |  5 +++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  9 +++++++--
 .../PersistentAcknowledgmentsGroupingTracker.java  | 18 +++++++++++++----
 .../pulsar/client/impl/BatchMessageAckerTest.java  | 12 +++++++++++
 .../collections/ConcurrentBitSetRecyclable.java    |  7 +++++++
 .../ConcurrentBitSetRecyclableTest.java            | 23 ++++++++++++++++++++++
 6 files changed, 68 insertions(+), 6 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
index d46d3b3..e34d1a1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
@@ -34,6 +34,11 @@ class BatchMessageAcker {
         return new BatchMessageAcker(bitSet, batchSize);
     }
 
+    // Use the param bitSet as the BatchMessageAcker's bitSet, don't care about the batchSize.
+    static BatchMessageAcker newAcker(BitSet bitSet) {
+        return new BatchMessageAcker(bitSet, -1);
+    }
+
     // bitset shared across messages in the same batch.
     private final int batchSize;
     private final BitSet bitSet;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b1df6f8..cfaaa89 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1356,16 +1356,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         // create ack tracker for entry aka batch
         MessageIdImpl batchMessage = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
                 getPartitionIndex());
-        BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize);
         List<MessageImpl<T>> possibleToDeadLetter = null;
         if (deadLetterPolicy != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
             possibleToDeadLetter = new ArrayList<>();
         }
-        int skippedMessages = 0;
+
+        BatchMessageAcker acker;
         BitSetRecyclable ackBitSet = null;
         if (ackSet != null && ackSet.size() > 0) {
             ackBitSet = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet));
+            acker = BatchMessageAcker.newAcker(BitSet.valueOf(SafeCollectionUtils.longListToArray(ackSet)));
+        } else {
+            acker = BatchMessageAcker.newAcker(batchSize);
         }
+
+        int skippedMessages = 0;
         try {
             int startBatchIndex = Math.max(messageId.getBatchIndex(), 0);
             for (int i = startBatchIndex; i < batchSize; ++i) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 6a4deef..fd61c42 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -168,8 +168,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
         } else if (ackType == AckType.Individual) {
             ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
                 new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex()), (v) -> {
-                    ConcurrentBitSetRecyclable value = ConcurrentBitSetRecyclable.create();
-                    value.set(0, batchSize);
+                    ConcurrentBitSetRecyclable value;
+                    if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) {
+                        value = ConcurrentBitSetRecyclable.create(msgId.getAcker().getBitSet());
+                    } else {
+                        value = ConcurrentBitSetRecyclable.create();
+                        value.set(0, batchSize);
+                    }
                     return value;
                 });
             bitSet.clear(batchIndex);
@@ -221,8 +226,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
         if (cnx == null) {
             return false;
         }
-        BitSetRecyclable bitSet = BitSetRecyclable.create();
-        bitSet.set(0, batchSize);
+        BitSetRecyclable bitSet;
+        if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) {
+            bitSet = BitSetRecyclable.valueOf(msgId.getAcker().getBitSet().toLongArray());
+        } else {
+            bitSet = BitSetRecyclable.create();
+            bitSet.set(0, batchSize);
+        }
         if (ackType == AckType.Cumulative) {
             bitSet.clear(0, batchIndex + 1);
         } else {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
index 2bfa620..8c1565e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
@@ -22,9 +22,12 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
+import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.util.BitSet;
+
 public class BatchMessageAckerTest {
 
     private static final int BATCH_SIZE = 10;
@@ -68,4 +71,13 @@ public class BatchMessageAckerTest {
         assertEquals(0, acker.getOutstandingAcks());
     }
 
+    @Test
+    public void testBitSetAcker() {
+        BitSet bitSet = BitSet.valueOf(acker.getBitSet().toLongArray());
+        BatchMessageAcker bitSetAcker = BatchMessageAcker.newAcker(bitSet);
+
+        Assert.assertEquals(acker.getBitSet(), bitSetAcker.getBitSet());
+        Assert.assertEquals(acker.getOutstandingAcks(), bitSetAcker.getOutstandingAcks());
+    }
+
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
index 8e787c1..21ee42b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.common.util.collections;
 
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
+import java.util.BitSet;
 
 /**
  * Safe multithreaded version of {@code BitSet} and leverage netty recycler.
@@ -43,6 +44,12 @@ public class ConcurrentBitSetRecyclable extends ConcurrentBitSet {
         return RECYCLER.get();
     }
 
+    public static ConcurrentBitSetRecyclable create(BitSet bitSet) {
+        ConcurrentBitSetRecyclable recyclable = RECYCLER.get();
+        recyclable.or(bitSet);
+        return recyclable;
+    }
+
     public void recycle() {
         this.clear();
         recyclerHandle.recycle(this);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java
index b037c70..e937176 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.common.util.collections;
 
+import java.util.BitSet;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -34,4 +35,26 @@ public class ConcurrentBitSetRecyclableTest {
         Assert.assertFalse(bitset2.get(3));
         Assert.assertNotSame(bitset3, bitset1);
     }
+
+    @Test
+    public void testGenerateByBitSet() {
+        BitSet bitSet = new BitSet();
+        ConcurrentBitSetRecyclable bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet);
+        Assert.assertEquals(bitSet, bitSetRecyclable);
+
+        bitSet.set(0, 10);
+        bitSetRecyclable.recycle();
+        bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet);
+        Assert.assertEquals(bitSet, bitSetRecyclable);
+
+        bitSet.clear(5);
+        bitSetRecyclable.recycle();
+        bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet);
+        Assert.assertEquals(bitSet, bitSetRecyclable);
+
+        bitSet.clear();
+        bitSetRecyclable.recycle();
+        bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet);
+        Assert.assertEquals(bitSet, bitSetRecyclable);
+    }
 }