You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/09/01 13:13:38 UTC
[pulsar] branch branch-2.9 updated: Fix calculate avg message per entry (#17281)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 4fad35366e6 Fix calculate avg message per entry (#17281)
4fad35366e6 is described below
commit 4fad35366e6ba090a3bb81b4dd752c0614be0170
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Thu Sep 1 21:13:32 2022 +0800
Fix calculate avg message per entry (#17281)
---
.../org/apache/pulsar/broker/service/Consumer.java | 21 ++++++++-------
.../pulsar/broker/stats/ConsumerStatsTest.java | 30 ++++++++++++----------
2 files changed, 29 insertions(+), 22 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 4ff0a48ba1e..297aefee6e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -245,13 +245,16 @@ public class Consumer {
return writePromise;
}
int unackedMessages = totalMessages;
- // Note
- // Must ensure that the message is written to the pendingAcks before sent is first , because this consumer
- // is possible to disconnect at this time.
- if (pendingAcks != null) {
- for (int i = 0; i < entries.size(); i++) {
- Entry entry = entries.get(i);
- if (entry != null) {
+ int totalEntries = 0;
+
+ for (int i = 0; i < entries.size(); i++) {
+ Entry entry = entries.get(i);
+ if (entry != null) {
+ totalEntries++;
+ // Note
+ // Must ensure that the message is written to the pendingAcks before sent is first,
+ // because this consumer is possible to disconnect at this time.
+ if (pendingAcks != null) {
int batchSize = batchSizes.getBatchSize(i);
int stickyKeyHash = getStickyKeyHash(entry);
long[] ackSet = getCursorAckSet(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
@@ -272,10 +275,10 @@ public class Consumer {
// calculate avg message per entry
if (avgMessagesPerEntry.get() < 1) { //valid avgMessagesPerEntry should always >= 1
// set init value.
- avgMessagesPerEntry.set(1.0 * totalMessages / entries.size());
+ avgMessagesPerEntry.set(1.0 * totalMessages / totalEntries);
} else {
avgMessagesPerEntry.set(avgMessagesPerEntry.get() * avgPercent
- + (1 - avgPercent) * totalMessages / entries.size());
+ + (1 - avgPercent) * totalMessages / totalEntries);
}
// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 24220625f91..1595428f320 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -22,17 +22,31 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.Assert;
@@ -40,16 +54,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.io.ByteArrayOutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.UUID;
-
@Slf4j
@Test(groups = "broker")
public class ConsumerStatsTest extends ProducerConsumerBase {