You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/12/29 13:48:48 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #13383: Fix batch message ack does not decrease the unacked-msg count.

eolivelli commented on a change in pull request #13383:
URL: https://github.com/apache/pulsar/pull/13383#discussion_r776330462



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -233,7 +236,7 @@ public boolean readCompacted() {
             writePromise.setSuccess(null);
             return writePromise;
         }
-
+        int unackedMessage = totalMessages;

Review comment:
       Typo: change to unackedMessages (plural)

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -489,6 +552,31 @@ private void checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageId
         }
     }
 
+    private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
+        Consumer ackOwnerConsumer = this;
+        if (Subscription.isIndividualAckMode(subType)) {
+            for (Consumer consumer : subscription.getConsumers()) {
+                if (!consumer.equals(this) && consumer.getPendingAcks().containsKey(ledgerId, entryId)) {

Review comment:
       Here it is better to use '==' for this comparison against 'this'

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -476,6 +499,46 @@ public void doUnsubscribe(final long requestId) {
         return completableFuture;
     }
 
+    private long getBatchSize(MessageIdData msgId) {
+        long batchSize = 1;
+        if (Subscription.isIndividualAckMode(subType)) {
+            LongPair longPair = pendingAcks.get(msgId.getLedgerId(), msgId.getEntryId());
+            // Consumer may ack the msg that not belongs to it.
+            if (longPair == null) {
+                Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
+                longPair = ackOwnerConsumer.getPendingAcks().get(msgId.getLedgerId(), msgId.getEntryId());
+                if (longPair != null) {
+                    batchSize = longPair.first;
+                }
+            } else {
+                batchSize = longPair.first;
+            }
+        }
+        return batchSize;
+    }
+
+    private long getAckedCount(PositionImpl position, long batchSize, long[] ackSets) {
+        long ackedCount;
+        if (isDeletionAtBatchIndexLevelEnabled()) {
+            long[] cursorAckSet = getCursorAckSet(position);
+            if (cursorAckSet != null) {
+                BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet);
+                int lastCardinality = cursorBitSet.cardinality();
+                BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(ackSets);
+                cursorBitSet.and(givenBitSet);
+                givenBitSet.recycle();
+                int currentCardinality = cursorBitSet.cardinality();
+                ackedCount = Math.abs(currentCardinality - lastCardinality);

Review comment:
       Why do we need to use 'abs' here?
   
   If an underflow is possible then we are going to set a meaningless value

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
##########
@@ -66,6 +68,7 @@
     @BeforeClass
     @Override
     protected void setup() throws Exception {
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);

Review comment:
       Here here two problems:
   - this line affects all the tests in this class, so you are changing existing tests
   prwe are not testing what happens when the value is 'false' (so some of your new branches won't be touched by this test)
   
   So probably it is better to run this class with both 'true' and 'false'

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -1815,13 +1815,13 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs, boolean ackReceiptE
                     messages.add(msg);
                     totalReceiveMessages++;
                     consumer1.acknowledgeAsync(msg);
-                    log.info("Received message: " + new String(msg.getData()));
                 } else {
                     break;
                 }
             }
             // verify total-consumer messages = total-produce messages
-            assertEquals(totalProducedMsgs, totalReceiveMessages);
+            final int finalTotalReceiveMessages = totalReceiveMessages;
+            Awaitility.await().untilAsserted(() -> assertEquals(totalProducedMsgs, finalTotalReceiveMessages));

Review comment:
       Using awaitility seems useless here as it looks like you are comparing two constants




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org