You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/19 06:02:24 UTC
[pulsar] 10/14: [Broker] Fix precision issue and initial value for Consumer#avgMessagesPerEntry (#14666)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 35b2565185d64e981d8b78bd9f4393f5f572e4b3
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Wed Mar 16 11:37:36 2022 +0800
[Broker] Fix precision issue and initial value for Consumer#avgMessagesPerEntry (#14666)
### Motivation
1. Precision issue
There is precision issue to use int type for `Consumer#avgMessagesPerEntry`.
```
tmpAvgMessagesPerEntry = (int) Math.floor(tmpAvgMessagesPerEntry * avgPercent
+ (1 - avgPercent) * totalMessages / entries.size());
```
For example, if `tmpAvgMessagesPerEntry` = 1 and new value of `totalMessages / entries.size()` is always 5, then the `tmpAvgMessagesPerEntry` is always 1 and never increase.
2. Initial value issue.
And the init value of 1000 seems confusing in consumerStats for users, and it need quite a long time to decrease if message rate is very slow.
### Modifications
1. Change type of avgMessagesPerEntry to double.
2. Change init value from 1000 to first `totalMessages / entries.size()`.
(cherry picked from commit de2e6c8ce5821b945cbdd0cb2969234667b017ec)
---
.../org/apache/pulsar/broker/service/Consumer.java | 28 ++++++++++++----------
.../PersistentDispatcherMultipleConsumers.java | 3 ++-
.../PersistentDispatcherSingleActiveConsumer.java | 2 +-
3 files changed, 19 insertions(+), 14 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index c30b991..dbc425d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AtomicDouble;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.util.ArrayList;
@@ -119,12 +120,10 @@ public class Consumer {
/**
* It starts keep tracking the average messages per entry.
- * The initial value is 1000, when new value comes, it will update with
+ * The initial value is 0, when new value comes, it will update with
* avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value.
*/
- private static final AtomicIntegerFieldUpdater<Consumer> AVG_MESSAGES_PER_ENTRY =
- AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "avgMessagesPerEntry");
- private volatile int avgMessagesPerEntry = 1000;
+ private final AtomicDouble avgMessagesPerEntry = new AtomicDouble(0);
private static final long [] EMPTY_ACK_SET = new long[0];
private static final double avgPercent = 0.9;
@@ -172,7 +171,6 @@ public class Consumer {
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
MESSAGE_PERMITS_UPDATER.set(this, 0);
UNACKED_MESSAGES_UPDATER.set(this, 0);
- AVG_MESSAGES_PER_ENTRY.set(this, 1000);
this.metadata = metadata != null ? metadata : Collections.emptyMap();
@@ -283,10 +281,13 @@ public class Consumer {
}
// calculate avg message per entry
- int tmpAvgMessagesPerEntry = AVG_MESSAGES_PER_ENTRY.get(this);
- tmpAvgMessagesPerEntry = (int) Math.floor(tmpAvgMessagesPerEntry * avgPercent
- + (1 - avgPercent) * totalMessages / entries.size());
- AVG_MESSAGES_PER_ENTRY.set(this, tmpAvgMessagesPerEntry);
+ if (avgMessagesPerEntry.get() < 1) { //valid avgMessagesPerEntry should always >= 1
+ // set init value.
+ avgMessagesPerEntry.set(1.0 * totalMessages / entries.size());
+ } else {
+ avgMessagesPerEntry.set(avgMessagesPerEntry.get() * avgPercent
+ + (1 - avgPercent) * totalMessages / entries.size());
+ }
// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
int ackedCount = batchIndexesAcks == null ? 0 : batchIndexesAcks.getTotalAckedIndexCount();
@@ -294,7 +295,7 @@ public class Consumer {
if (log.isDebugEnabled()){
log.debug("[{}-{}] Added {} minus {} messages to MESSAGE_PERMITS_UPDATER in broker.service.Consumer"
+ " for consumerId: {}; avgMessagesPerEntry is {}",
- topicName, subscription, ackedCount, totalMessages, consumerId, tmpAvgMessagesPerEntry);
+ topicName, subscription, ackedCount, totalMessages, consumerId, avgMessagesPerEntry.get());
}
incrementUnackedMessages(unackedMessages);
msgOut.recordMultipleEvents(totalMessages, totalBytes);
@@ -701,8 +702,11 @@ public class Consumer {
return MESSAGE_PERMITS_UPDATER.get(this);
}
+ /**
+ * return 0 if there is no entry dispatched yet.
+ */
public int getAvgMessagesPerEntry() {
- return AVG_MESSAGES_PER_ENTRY.get(this);
+ return (int) Math.round(avgMessagesPerEntry.get());
}
public boolean isBlocked() {
@@ -747,7 +751,7 @@ public class Consumer {
}
unackedMessages = consumerStats.unackedMessages;
blockedConsumerOnUnackedMsgs = consumerStats.blockedConsumerOnUnackedMsgs;
- AVG_MESSAGES_PER_ENTRY.set(this, consumerStats.avgMessagesPerEntry);
+ avgMessagesPerEntry.set(consumerStats.avgMessagesPerEntry);
}
public ConsumerStatsImpl getStats() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 5aa351b..092bc79 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -294,8 +294,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
Consumer c = getRandomConsumer();
// if turn on precise dispatcher flow control, adjust the record to read
if (c != null && c.isPreciseDispatcherFlowControl()) {
+ int avgMessagesPerEntry = Math.max(1, c.getAvgMessagesPerEntry());
messagesToRead = Math.min(
- (int) Math.ceil(currentTotalAvailablePermits * 1.0 / c.getAvgMessagesPerEntry()),
+ (int) Math.ceil(currentTotalAvailablePermits * 1.0 / avgMessagesPerEntry),
readBatchSize);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 6a715df..e2f2ac9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -407,7 +407,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();
// if turn of precise dispatcher flow control, adjust the records to read
if (consumer.isPreciseDispatcherFlowControl()) {
- int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
+ int avgMessagesPerEntry = Math.max(1, consumer.getAvgMessagesPerEntry());
messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);
}