You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/19 06:02:24 UTC

[pulsar] 10/14: [Broker] Fix precision issue and initial value for Consumer#avgMessagesPerEntry (#14666)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 35b2565185d64e981d8b78bd9f4393f5f572e4b3
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Wed Mar 16 11:37:36 2022 +0800

    [Broker] Fix precision issue and initial value for Consumer#avgMessagesPerEntry (#14666)
    
    ### Motivation
    
    1. Precision issue
    There is precision issue to use int type for `Consumer#avgMessagesPerEntry`.
    ```
    tmpAvgMessagesPerEntry = (int) Math.floor(tmpAvgMessagesPerEntry * avgPercent
                    + (1 - avgPercent) * totalMessages / entries.size());
    ```
    
    For example, if `tmpAvgMessagesPerEntry` = 1 and new value of  `totalMessages / entries.size()` is always 5,  then the `tmpAvgMessagesPerEntry` is always 1 and never increase.
    
    2.  Initial value issue.
    And the init value of 1000 seems confusing in consumerStats for users, and it need quite a long time to decrease if message rate is very slow.
    
    ### Modifications
    
    1. Change type of avgMessagesPerEntry to double.
    2. Change init value from 1000 to first `totalMessages / entries.size()`.
    
    (cherry picked from commit de2e6c8ce5821b945cbdd0cb2969234667b017ec)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 28 ++++++++++++----------
 .../PersistentDispatcherMultipleConsumers.java     |  3 ++-
 .../PersistentDispatcherSingleActiveConsumer.java  |  2 +-
 3 files changed, 19 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index c30b991..dbc425d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AtomicDouble;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.Promise;
 import java.util.ArrayList;
@@ -119,12 +120,10 @@ public class Consumer {
 
     /**
      * It starts keep tracking the average messages per entry.
-     * The initial value is 1000, when new value comes, it will update with
+     * The initial value is 0, when new value comes, it will update with
      * avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value.
      */
-    private static final AtomicIntegerFieldUpdater<Consumer> AVG_MESSAGES_PER_ENTRY =
-            AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "avgMessagesPerEntry");
-    private volatile int avgMessagesPerEntry = 1000;
+    private final AtomicDouble avgMessagesPerEntry = new AtomicDouble(0);
     private static final long [] EMPTY_ACK_SET = new long[0];
 
     private static final double avgPercent = 0.9;
@@ -172,7 +171,6 @@ public class Consumer {
         PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
         MESSAGE_PERMITS_UPDATER.set(this, 0);
         UNACKED_MESSAGES_UPDATER.set(this, 0);
-        AVG_MESSAGES_PER_ENTRY.set(this, 1000);
 
         this.metadata = metadata != null ? metadata : Collections.emptyMap();
 
@@ -283,10 +281,13 @@ public class Consumer {
         }
 
         // calculate avg message per entry
-        int tmpAvgMessagesPerEntry = AVG_MESSAGES_PER_ENTRY.get(this);
-        tmpAvgMessagesPerEntry = (int) Math.floor(tmpAvgMessagesPerEntry * avgPercent
-                + (1 - avgPercent) * totalMessages / entries.size());
-        AVG_MESSAGES_PER_ENTRY.set(this, tmpAvgMessagesPerEntry);
+        if (avgMessagesPerEntry.get() < 1) { //valid avgMessagesPerEntry should always >= 1
+            // set init value.
+            avgMessagesPerEntry.set(1.0 * totalMessages / entries.size());
+        } else {
+            avgMessagesPerEntry.set(avgMessagesPerEntry.get() * avgPercent
+                    + (1 - avgPercent) * totalMessages / entries.size());
+        }
 
         // reduce permit and increment unackedMsg count with total number of messages in batch-msgs
         int ackedCount = batchIndexesAcks == null ? 0 : batchIndexesAcks.getTotalAckedIndexCount();
@@ -294,7 +295,7 @@ public class Consumer {
         if (log.isDebugEnabled()){
             log.debug("[{}-{}] Added {} minus {} messages to MESSAGE_PERMITS_UPDATER in broker.service.Consumer"
                             + " for consumerId: {}; avgMessagesPerEntry is {}",
-                   topicName, subscription, ackedCount, totalMessages, consumerId, tmpAvgMessagesPerEntry);
+                   topicName, subscription, ackedCount, totalMessages, consumerId, avgMessagesPerEntry.get());
         }
         incrementUnackedMessages(unackedMessages);
         msgOut.recordMultipleEvents(totalMessages, totalBytes);
@@ -701,8 +702,11 @@ public class Consumer {
         return MESSAGE_PERMITS_UPDATER.get(this);
     }
 
+    /**
+     * return 0 if there is no entry dispatched yet.
+     */
     public int getAvgMessagesPerEntry() {
-        return AVG_MESSAGES_PER_ENTRY.get(this);
+        return (int) Math.round(avgMessagesPerEntry.get());
     }
 
     public boolean isBlocked() {
@@ -747,7 +751,7 @@ public class Consumer {
         }
         unackedMessages = consumerStats.unackedMessages;
         blockedConsumerOnUnackedMsgs = consumerStats.blockedConsumerOnUnackedMsgs;
-        AVG_MESSAGES_PER_ENTRY.set(this, consumerStats.avgMessagesPerEntry);
+        avgMessagesPerEntry.set(consumerStats.avgMessagesPerEntry);
     }
 
     public ConsumerStatsImpl getStats() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 5aa351b..092bc79 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -294,8 +294,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         Consumer c = getRandomConsumer();
         // if turn on precise dispatcher flow control, adjust the record to read
         if (c != null && c.isPreciseDispatcherFlowControl()) {
+            int avgMessagesPerEntry = Math.max(1, c.getAvgMessagesPerEntry());
             messagesToRead = Math.min(
-                    (int) Math.ceil(currentTotalAvailablePermits * 1.0 / c.getAvgMessagesPerEntry()),
+                    (int) Math.ceil(currentTotalAvailablePermits * 1.0 / avgMessagesPerEntry),
                     readBatchSize);
         }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 6a715df..e2f2ac9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -407,7 +407,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
         long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();
         // if turn of precise dispatcher flow control, adjust the records to read
         if (consumer.isPreciseDispatcherFlowControl()) {
-            int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
+            int avgMessagesPerEntry = Math.max(1, consumer.getAvgMessagesPerEntry());
             messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);
         }