You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:20 UTC
[pulsar] 15/29: [fix][broker][monitoring] fix message ack rate (#16108)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4ce967ea6727afa0a36245b5755fa67f43a170da
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Tue Jun 21 15:53:03 2022 +0800
[fix][broker][monitoring] fix message ack rate (#16108)
(cherry picked from commit 8869d8c18361fcd5fcf731f9edf2d38ae07cc0cf)
---
.../org/apache/pulsar/broker/service/Consumer.java | 32 ++++---
.../pulsar/broker/stats/ConsumerStatsTest.java | 97 +++++++++++++++-------
2 files changed, 87 insertions(+), 42 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 349fbd860e4..9b0d678ffc9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -368,7 +368,7 @@ public class Consumer {
}
public CompletableFuture<Void> messageAcked(CommandAck ack) {
- CompletableFuture<Void> future;
+ CompletableFuture<Long> future;
this.lastAckedTimestamp = System.currentTimeMillis();
Map<String, Long> properties = Collections.emptyMap();
@@ -404,11 +404,12 @@ public class Consumer {
if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
List<PositionImpl> positionsAcked = Collections.singletonList(position);
future = transactionCumulativeAcknowledge(ack.getTxnidMostBits(),
- ack.getTxnidLeastBits(), positionsAcked);
+ ack.getTxnidLeastBits(), positionsAcked)
+ .thenApply(unused -> 1L);
} else {
List<Position> positionsAcked = Collections.singletonList(position);
subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties);
- future = CompletableFuture.completedFuture(null);
+ future = CompletableFuture.completedFuture(1L);
}
} else {
if (ack.hasTxnidLeastBits() && ack.hasTxnidMostBits()) {
@@ -419,16 +420,16 @@ public class Consumer {
}
return future
- .whenComplete((__, t) -> {
- if (t == null) {
- this.messageAckRate.recordEvent(ack.getMessageIdsCount());
- }
+ .thenApply(v -> {
+ this.messageAckRate.recordEvent(v);
+ return null;
});
}
//this method is for individual ack not carry the transaction
- private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
+ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
List<Position> positionsAcked = new ArrayList<>();
+ long totalAckCount = 0;
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
PositionImpl position;
@@ -461,10 +462,12 @@ public class Consumer {
checkCanRemovePendingAcksAndHandle(position, msgId);
checkAckValidationError(ack, position);
+
+ totalAckCount += ackedCount;
}
subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties);
- CompletableFuture<Void> completableFuture = new CompletableFuture<>();
- completableFuture.complete(null);
+ CompletableFuture<Long> completableFuture = new CompletableFuture<>();
+ completableFuture.complete(totalAckCount);
if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) {
completableFuture.whenComplete((v, e) -> positionsAcked.forEach(position -> {
//check if the position can remove from the consumer pending acks.
@@ -482,7 +485,7 @@ public class Consumer {
//this method is for individual ack carry the transaction
- private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) {
+ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
// Individual ack
List<MutablePair<PositionImpl, Integer>> positionsAcked = new ArrayList<>();
if (!isTransactionEnabled()) {
@@ -490,6 +493,7 @@ public class Consumer {
new BrokerServiceException.NotAllowedException("Server don't support transaction ack!"));
}
+ LongAdder totalAckCount = new LongAdder();
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
PositionImpl position;
@@ -518,6 +522,8 @@ public class Consumer {
checkCanRemovePendingAcksAndHandle(position, msgId);
checkAckValidationError(ack, position);
+
+ totalAckCount.add(ackedCount);
}
CompletableFuture<Void> completableFuture = transactionIndividualAcknowledge(ack.getTxnidMostBits(),
@@ -533,7 +539,7 @@ public class Consumer {
}
}));
}
- return completableFuture;
+ return completableFuture.thenApply(__ -> totalAckCount.sum());
}
private long getBatchSize(MessageIdData msgId) {
@@ -756,9 +762,9 @@ public class Consumer {
messageAckRate.calculateRate();
stats.msgRateOut = msgOut.getRate();
- stats.messageAckRate = messageAckRate.getRate();
stats.msgThroughputOut = msgOut.getValueRate();
stats.msgRateRedeliver = msgRedeliver.getRate();
+ stats.messageAckRate = messageAckRate.getValueRate();
stats.chunkedMessageRate = chunkedMessageRate.getRate();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 412efe69632..e35be235f3c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -29,7 +29,7 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -53,6 +53,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Slf4j
@@ -249,56 +250,94 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
private void testMessageAckRateMetric(String topicName, boolean exposeTopicLevelMetrics)
throws Exception {
- final int messages = 100;
+ final int messages = 1000;
String subName = "test_sub";
+ CountDownLatch latch = new CountDownLatch(messages);
@Cleanup
- Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName)
+ .enableBatching(true).batchingMaxMessages(10).create();
+
+ MessageListener<String> listener = (consumer, msg) -> {
+ try {
+ consumer.acknowledge(msg);
+ latch.countDown();
+ } catch (PulsarClientException e) {
+ //ignore
+ }
+ };
@Cleanup
- Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
- .subscriptionName(subName).isAckReceiptEnabled(true).subscribe();
+ Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .messageListener(listener)
+ .subscribe();
+ @Cleanup
+ Consumer<String> c2 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .messageListener(listener)
+ .subscribe();
String namespace = TopicName.get(topicName).getNamespace();
for (int i = 0; i < messages; i++) {
- producer.send(UUID.randomUUID().toString());
+ producer.sendAsync(UUID.randomUUID().toString());
}
+ producer.flush();
- for (int i = 0; i < messages; i++) {
- Message<String> message = consumer.receive(20, TimeUnit.SECONDS);
- if (message == null) {
- break;
- }
-
- consumer.acknowledge(message);
- }
+ latch.await(20, TimeUnit.SECONDS);
+ TimeUnit.SECONDS.sleep(1);
Topic topic = pulsar.getBrokerService().getTopic(topicName, false).get().get();
Subscription subscription = topic.getSubscription(subName);
List<org.apache.pulsar.broker.service.Consumer> consumers = subscription.getConsumers();
- Assert.assertEquals(consumers.size(), 1);
+ Assert.assertEquals(consumers.size(), 2);
org.apache.pulsar.broker.service.Consumer consumer1 = consumers.get(0);
+ org.apache.pulsar.broker.service.Consumer consumer2 = consumers.get(1);
consumer1.updateRates();
+ consumer2.updateRates();
ByteArrayOutputStream output = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output);
String metricStr = output.toString(StandardCharsets.UTF_8.name());
Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricStr);
- Collection<PrometheusMetricsTest.Metric> metrics = metricsMap.get("pulsar_consumer_msg_ack_rate");
- Assert.assertTrue(metrics.size() > 0);
-
- int num = 0;
- for (PrometheusMetricsTest.Metric metric : metrics) {
- if (exposeTopicLevelMetrics && metric.tags.get("subscription").equals(subName)) {
- num++;
- Assert.assertTrue(metric.value > 0);
- } else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) {
- num++;
- Assert.assertTrue(metric.value > 0);
- }
+ Collection<PrometheusMetricsTest.Metric> ackRateMetric = metricsMap.get("pulsar_consumer_msg_ack_rate");
+
+ String rateOutMetricName = exposeTopicLevelMetrics ? "pulsar_consumer_msg_rate_out" : "pulsar_rate_out";
+ Collection<PrometheusMetricsTest.Metric> rateOutMetric = metricsMap.get(rateOutMetricName);
+ Assert.assertTrue(ackRateMetric.size() > 0);
+ Assert.assertTrue(rateOutMetric.size() > 0);
+
+ if (exposeTopicLevelMetrics) {
+ String consumer1Name = consumer1.consumerName();
+ String consumer2Name = consumer2.consumerName();
+ double totalAckRate = ackRateMetric.stream()
+ .filter(metric -> metric.tags.get("consumer_name").equals(consumer1Name)
+ || metric.tags.get("consumer_name").equals(consumer2Name))
+ .mapToDouble(metric -> metric.value).sum();
+ double totalRateOut = rateOutMetric.stream()
+ .filter(metric -> metric.tags.get("consumer_name").equals(consumer1Name)
+ || metric.tags.get("consumer_name").equals(consumer2Name))
+ .mapToDouble(metric -> metric.value).sum();
+
+ Assert.assertTrue(totalAckRate > 0D);
+ Assert.assertTrue(totalRateOut > 0D);
+ Assert.assertEquals(totalAckRate, totalRateOut, totalRateOut * 0.1D);
+ } else {
+ double totalAckRate = ackRateMetric.stream()
+ .filter(metric -> namespace.equals(metric.tags.get("namespace")))
+ .mapToDouble(metric -> metric.value).sum();
+ double totalRateOut = rateOutMetric.stream()
+ .filter(metric -> namespace.equals(metric.tags.get("namespace")))
+ .mapToDouble(metric -> metric.value).sum();
+
+ Assert.assertTrue(totalAckRate > 0D);
+ Assert.assertTrue(totalRateOut > 0D);
+ Assert.assertEquals(totalAckRate, totalRateOut, totalRateOut * 0.1D);
}
-
- Assert.assertTrue(num > 0);
}
}