You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:20 UTC

[pulsar] 15/29: [fix][broker][monitoring] fix message ack rate (#16108)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4ce967ea6727afa0a36245b5755fa67f43a170da
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Tue Jun 21 15:53:03 2022 +0800

    [fix][broker][monitoring] fix message ack rate (#16108)
    
    (cherry picked from commit 8869d8c18361fcd5fcf731f9edf2d38ae07cc0cf)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 32 ++++---
 .../pulsar/broker/stats/ConsumerStatsTest.java     | 97 +++++++++++++++-------
 2 files changed, 87 insertions(+), 42 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 349fbd860e4..9b0d678ffc9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -368,7 +368,7 @@ public class Consumer {
     }
 
     public CompletableFuture<Void> messageAcked(CommandAck ack) {
-        CompletableFuture<Void> future;
+        CompletableFuture<Long> future;
 
         this.lastAckedTimestamp = System.currentTimeMillis();
         Map<String, Long> properties = Collections.emptyMap();
@@ -404,11 +404,12 @@ public class Consumer {
             if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
                 List<PositionImpl> positionsAcked = Collections.singletonList(position);
                 future = transactionCumulativeAcknowledge(ack.getTxnidMostBits(),
-                        ack.getTxnidLeastBits(), positionsAcked);
+                        ack.getTxnidLeastBits(), positionsAcked)
+                        .thenApply(unused -> 1L);
             } else {
                 List<Position> positionsAcked = Collections.singletonList(position);
                 subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties);
-                future = CompletableFuture.completedFuture(null);
+                future = CompletableFuture.completedFuture(1L);
             }
         } else {
             if (ack.hasTxnidLeastBits() && ack.hasTxnidMostBits()) {
@@ -419,16 +420,16 @@ public class Consumer {
         }
 
         return future
-                .whenComplete((__, t) -> {
-                    if (t == null) {
-                        this.messageAckRate.recordEvent(ack.getMessageIdsCount());
-                    }
+                .thenApply(v -> {
+                    this.messageAckRate.recordEvent(v);
+                    return null;
                 });
     }
 
     //this method is for individual ack not carry the transaction
-    private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
+    private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
         List<Position> positionsAcked = new ArrayList<>();
+        long totalAckCount = 0;
         for (int i = 0; i < ack.getMessageIdsCount(); i++) {
             MessageIdData msgId = ack.getMessageIdAt(i);
             PositionImpl position;
@@ -461,10 +462,12 @@ public class Consumer {
             checkCanRemovePendingAcksAndHandle(position, msgId);
 
             checkAckValidationError(ack, position);
+
+            totalAckCount += ackedCount;
         }
         subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties);
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        completableFuture.complete(null);
+        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
+        completableFuture.complete(totalAckCount);
         if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) {
             completableFuture.whenComplete((v, e) -> positionsAcked.forEach(position -> {
                 //check if the position can remove from the consumer pending acks.
@@ -482,7 +485,7 @@ public class Consumer {
 
 
     //this method is for individual ack carry the transaction
-    private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) {
+    private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
         // Individual ack
         List<MutablePair<PositionImpl, Integer>> positionsAcked = new ArrayList<>();
         if (!isTransactionEnabled()) {
@@ -490,6 +493,7 @@ public class Consumer {
                     new BrokerServiceException.NotAllowedException("Server don't support transaction ack!"));
         }
 
+        LongAdder totalAckCount = new LongAdder();
         for (int i = 0; i < ack.getMessageIdsCount(); i++) {
             MessageIdData msgId = ack.getMessageIdAt(i);
             PositionImpl position;
@@ -518,6 +522,8 @@ public class Consumer {
             checkCanRemovePendingAcksAndHandle(position, msgId);
 
             checkAckValidationError(ack, position);
+
+            totalAckCount.add(ackedCount);
         }
 
         CompletableFuture<Void> completableFuture = transactionIndividualAcknowledge(ack.getTxnidMostBits(),
@@ -533,7 +539,7 @@ public class Consumer {
                         }
                     }));
         }
-        return completableFuture;
+        return completableFuture.thenApply(__ -> totalAckCount.sum());
     }
 
     private long getBatchSize(MessageIdData msgId) {
@@ -756,9 +762,9 @@ public class Consumer {
         messageAckRate.calculateRate();
 
         stats.msgRateOut = msgOut.getRate();
-        stats.messageAckRate = messageAckRate.getRate();
         stats.msgThroughputOut = msgOut.getValueRate();
         stats.msgRateRedeliver = msgRedeliver.getRate();
+        stats.messageAckRate = messageAckRate.getValueRate();
         stats.chunkedMessageRate = chunkedMessageRate.getRate();
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 412efe69632..e35be235f3c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -29,7 +29,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -53,6 +53,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
@@ -249,56 +250,94 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
 
     private void testMessageAckRateMetric(String topicName, boolean exposeTopicLevelMetrics)
             throws Exception {
-        final int messages = 100;
+        final int messages = 1000;
         String subName = "test_sub";
+        CountDownLatch latch = new CountDownLatch(messages);
 
         @Cleanup
-        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName)
+                .enableBatching(true).batchingMaxMessages(10).create();
+
+        MessageListener<String> listener = (consumer, msg) -> {
+            try {
+                consumer.acknowledge(msg);
+                latch.countDown();
+            } catch (PulsarClientException e) {
+                //ignore
+            }
+        };
         @Cleanup
-        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
-                .subscriptionName(subName).isAckReceiptEnabled(true).subscribe();
+        Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .messageListener(listener)
+                .subscribe();
+        @Cleanup
+        Consumer<String> c2 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .messageListener(listener)
+                .subscribe();
 
         String namespace = TopicName.get(topicName).getNamespace();
 
         for (int i = 0; i < messages; i++) {
-            producer.send(UUID.randomUUID().toString());
+            producer.sendAsync(UUID.randomUUID().toString());
         }
+        producer.flush();
 
-        for (int i = 0; i < messages; i++) {
-            Message<String> message = consumer.receive(20, TimeUnit.SECONDS);
-            if (message == null) {
-                break;
-            }
-
-            consumer.acknowledge(message);
-        }
+        latch.await(20, TimeUnit.SECONDS);
+        TimeUnit.SECONDS.sleep(1);
 
         Topic topic = pulsar.getBrokerService().getTopic(topicName, false).get().get();
         Subscription subscription = topic.getSubscription(subName);
         List<org.apache.pulsar.broker.service.Consumer> consumers = subscription.getConsumers();
-        Assert.assertEquals(consumers.size(), 1);
+        Assert.assertEquals(consumers.size(), 2);
         org.apache.pulsar.broker.service.Consumer consumer1 = consumers.get(0);
+        org.apache.pulsar.broker.service.Consumer consumer2 = consumers.get(1);
         consumer1.updateRates();
+        consumer2.updateRates();
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output);
         String metricStr = output.toString(StandardCharsets.UTF_8.name());
 
         Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricStr);
-        Collection<PrometheusMetricsTest.Metric> metrics = metricsMap.get("pulsar_consumer_msg_ack_rate");
-        Assert.assertTrue(metrics.size() > 0);
-
-        int num = 0;
-        for (PrometheusMetricsTest.Metric metric : metrics) {
-            if (exposeTopicLevelMetrics && metric.tags.get("subscription").equals(subName)) {
-                num++;
-                Assert.assertTrue(metric.value > 0);
-            } else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) {
-                num++;
-                Assert.assertTrue(metric.value > 0);
-            }
+        Collection<PrometheusMetricsTest.Metric> ackRateMetric = metricsMap.get("pulsar_consumer_msg_ack_rate");
+
+        String rateOutMetricName = exposeTopicLevelMetrics ? "pulsar_consumer_msg_rate_out" : "pulsar_rate_out";
+        Collection<PrometheusMetricsTest.Metric> rateOutMetric = metricsMap.get(rateOutMetricName);
+        Assert.assertTrue(ackRateMetric.size() > 0);
+        Assert.assertTrue(rateOutMetric.size() > 0);
+
+        if (exposeTopicLevelMetrics) {
+            String consumer1Name = consumer1.consumerName();
+            String consumer2Name = consumer2.consumerName();
+            double totalAckRate = ackRateMetric.stream()
+                    .filter(metric -> metric.tags.get("consumer_name").equals(consumer1Name)
+                            || metric.tags.get("consumer_name").equals(consumer2Name))
+                    .mapToDouble(metric -> metric.value).sum();
+            double totalRateOut = rateOutMetric.stream()
+                    .filter(metric -> metric.tags.get("consumer_name").equals(consumer1Name)
+                            || metric.tags.get("consumer_name").equals(consumer2Name))
+                    .mapToDouble(metric -> metric.value).sum();
+
+            Assert.assertTrue(totalAckRate > 0D);
+            Assert.assertTrue(totalRateOut > 0D);
+            Assert.assertEquals(totalAckRate, totalRateOut, totalRateOut * 0.1D);
+        } else {
+            double totalAckRate = ackRateMetric.stream()
+                    .filter(metric -> namespace.equals(metric.tags.get("namespace")))
+                    .mapToDouble(metric -> metric.value).sum();
+            double totalRateOut = rateOutMetric.stream()
+                    .filter(metric -> namespace.equals(metric.tags.get("namespace")))
+                    .mapToDouble(metric -> metric.value).sum();
+
+            Assert.assertTrue(totalAckRate > 0D);
+            Assert.assertTrue(totalRateOut > 0D);
+            Assert.assertEquals(totalAckRate, totalRateOut, totalRateOut * 0.1D);
         }
-
-        Assert.assertTrue(num > 0);
     }
 }