You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/09/01 13:13:38 UTC

[pulsar] branch branch-2.9 updated: Fix calculate avg message per entry (#17281)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 4fad35366e6 Fix calculate avg message per entry (#17281)
4fad35366e6 is described below

commit 4fad35366e6ba090a3bb81b4dd752c0614be0170
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Thu Sep 1 21:13:32 2022 +0800

    Fix calculate avg message per entry (#17281)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 21 ++++++++-------
 .../pulsar/broker/stats/ConsumerStatsTest.java     | 30 ++++++++++++----------
 2 files changed, 29 insertions(+), 22 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 4ff0a48ba1e..297aefee6e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -245,13 +245,16 @@ public class Consumer {
             return writePromise;
         }
         int unackedMessages = totalMessages;
-        // Note
-        // Must ensure that the message is written to the pendingAcks before sent is first , because this consumer
-        // is possible to disconnect at this time.
-        if (pendingAcks != null) {
-            for (int i = 0; i < entries.size(); i++) {
-                Entry entry = entries.get(i);
-                if (entry != null) {
+        int totalEntries = 0;
+
+        for (int i = 0; i < entries.size(); i++) {
+            Entry entry = entries.get(i);
+            if (entry != null) {
+                totalEntries++;
+                // Note
+                // Must ensure that the message is written to the pendingAcks before sent is first,
+                // because this consumer is possible to disconnect at this time.
+                if (pendingAcks != null) {
                     int batchSize = batchSizes.getBatchSize(i);
                     int stickyKeyHash = getStickyKeyHash(entry);
                     long[] ackSet = getCursorAckSet(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
@@ -272,10 +275,10 @@ public class Consumer {
         // calculate avg message per entry
         if (avgMessagesPerEntry.get() < 1) { //valid avgMessagesPerEntry should always >= 1
             // set init value.
-            avgMessagesPerEntry.set(1.0 * totalMessages / entries.size());
+            avgMessagesPerEntry.set(1.0 * totalMessages / totalEntries);
         } else {
             avgMessagesPerEntry.set(avgMessagesPerEntry.get() * avgPercent
-                    + (1 - avgPercent) * totalMessages / entries.size());
+                    + (1 - avgPercent) * totalMessages / totalEntries);
         }
 
         // reduce permit and increment unackedMsg count with total number of messages in batch-msgs
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 24220625f91..1595428f320 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -22,17 +22,31 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.testng.Assert;
@@ -40,16 +54,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.io.ByteArrayOutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.UUID;
-
 @Slf4j
 @Test(groups = "broker")
 public class ConsumerStatsTest extends ProducerConsumerBase {