You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/05/29 02:09:40 UTC

[pulsar] branch branch-2.9 updated: [branch-2.9] Add publishRateLimitedTimes to topic metrics. (#15739)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 15fdf15ff6f [branch-2.9] Add publishRateLimitedTimes to topic metrics. (#15739)
15fdf15ff6f is described below

commit 15fdf15ff6f05e514b88143446da9439a7bab17b
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Sun May 29 10:09:30 2022 +0800

    [branch-2.9] Add publishRateLimitedTimes to topic metrics. (#15739)
---
 .../pulsar/broker/service/AbstractTopic.java       |   9 ++
 .../apache/pulsar/broker/service/ServerCnx.java    |  13 +++
 .../org/apache/pulsar/broker/service/Topic.java    |   5 +
 .../broker/service/persistent/PersistentTopic.java |   3 +-
 .../stats/prometheus/NamespaceStatsAggregator.java |   1 +
 .../pulsar/broker/stats/prometheus/TopicStats.java |   5 +
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 105 ++++++++++++++++++++-
 .../common/policies/data/stats/TopicStatsImpl.java |   5 +
 8 files changed, 143 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index fbdbd611d50..69c51c77d44 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -116,6 +116,10 @@ public abstract class AbstractTopic implements Topic {
     private LongAdder bytesInCounter = new LongAdder();
     private LongAdder msgInCounter = new LongAdder();
 
+    private static final AtomicLongFieldUpdater<AbstractTopic> RATE_LIMITED_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "publishRateLimitedTimes");
+    protected volatile long publishRateLimitedTimes = 0;
+
     protected volatile Optional<Long> topicEpoch = Optional.empty();
     private volatile boolean hasExclusiveProducer;
     // pointer to the exclusive producer
@@ -634,6 +638,11 @@ public abstract class AbstractTopic implements Topic {
         }
     }
 
+    @Override
+    public long increasePublishLimitedTimes() {
+        return RATE_LIMITED_UPDATER.incrementAndGet(this);
+    }
+
     protected void internalAddProducer(Producer producer) throws BrokerServiceException {
         if (isProducersExceeded()) {
             log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 3c420fe43b3..58035b84c38 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2512,6 +2512,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             // When the quota of pending send requests is reached, stop reading from socket to cause backpressure on
             // client connection, possibly shared between multiple producers
             ctx.channel().config().setAutoRead(false);
+            recordRateLimitMetrics(producers);
             autoReadDisabledRateLimiting = isPublishRateExceeded;
             throttledConnections.inc();
         }
@@ -2533,6 +2534,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         }
     }
 
+    private void recordRateLimitMetrics(ConcurrentLongHashMap<CompletableFuture<Producer>> producers) {
+        producers.forEach((key, producerFuture) -> {
+            if (producerFuture != null && producerFuture.isDone()) {
+                Producer p = producerFuture.getNow(null);
+                if (p != null && p.getTopic() != null) {
+                    p.getTopic().increasePublishLimitedTimes();
+                }
+            }
+        });
+    }
+
     @Override
     public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
         if (pendingBytesPerThread.get().addAndGet(-msgSize) < resumeThresholdPendingBytesPerThread
@@ -2577,6 +2589,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     public void disableCnxAutoRead() {
         if (ctx != null && ctx.channel().config().isAutoRead()) {
             ctx.channel().config().setAutoRead(false);
+            recordRateLimitMetrics(producers);
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 9dd945f78da..14e8e3fe004 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -129,6 +129,11 @@ public interface Topic {
      */
     void recordAddLatency(long latency, TimeUnit unit);
 
+    /**
+     * increase the publishing limited times.
+     */
+    long increasePublishLimitedTimes();
+
     CompletableFuture<Consumer> subscribe(TransportCnx cnx, String subscriptionName, long consumerId, SubType subType,
                                           int priorityLevel, String consumerName, boolean isDurable,
                                           MessageId startMessageId,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index db824b0706b..22531e8ed09 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1682,7 +1682,7 @@ public class PersistentTopic extends AbstractTopic
     public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats,
                             StatsOutputStream topicStatsStream,
                             ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
-
+        this.publishRateLimitedTimes = 0;
         TopicStatsHelper topicStatsHelper = threadLocalTopicStats.get();
         topicStatsHelper.reset();
 
@@ -1931,6 +1931,7 @@ public class PersistentTopic extends AbstractTopic
         stats.waitingPublishers = getWaitingProducersCount();
         stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
         stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();
+        stats.publishRateLimitedTimes = publishRateLimitedTimes;
 
         subscriptions.forEach((name, subscription) -> {
             SubscriptionStatsImpl subStats = subscription.getStats(getPreciseBacklog, subscriptionBacklogSize);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 9df9ad4c7c6..2025c59c1a1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -148,6 +148,7 @@ public class NamespaceStatsAggregator {
         stats.msgOutCounter = tStatus.msgOutCounter;
         stats.bytesOutCounter = tStatus.bytesOutCounter;
         stats.averageMsgSize = tStatus.averageMsgSize;
+        stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
 
         stats.producersCount = 0;
         topic.getProducers().values().forEach(producer -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index bfa427e5e3a..599c3d82f6f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -44,6 +44,8 @@ class TopicStats {
 
     public long msgBacklog;
 
+    long publishRateLimitedTimes;
+
     long backlogQuotaLimit;
     long backlogQuotaLimitTime;
 
@@ -82,6 +84,7 @@ class TopicStats {
 
         managedLedgerStats.reset();
         msgBacklog = 0;
+        publishRateLimitedTimes = 0L;
         backlogQuotaLimit = 0;
         backlogQuotaLimitTime = -1;
 
@@ -133,6 +136,8 @@ class TopicStats {
                 splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size",
                 stats.managedLedgerStats.backlogSize, splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
+                splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_offloaded_size", stats.managedLedgerStats
                 .offloadedStorageUsed, splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index b4094e488f3..1304d5c7ae2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -55,12 +55,12 @@ import java.util.regex.Pattern;
 import javax.crypto.SecretKey;
 import javax.naming.AuthenticationException;
 import lombok.Cleanup;
-import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.commons.io.IOUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
@@ -74,7 +74,6 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.compaction.Compactor;
-import org.apache.pulsar.compaction.TwoPhaseCompactor;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -95,6 +94,108 @@ public class PrometheusMetricsTest extends BrokerTestBase {
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
+        resetConfig();
+    }
+
+    @Test
+    public void testPublishRateLimitedTimes() throws Exception {
+        cleanup();
+        checkPublishRateLimitedTimes(true);
+        cleanup();
+        checkPublishRateLimitedTimes(false);
+    }
+
+    private void checkPublishRateLimitedTimes(boolean preciseRateLimit) throws Exception {
+        if (preciseRateLimit) {
+            conf.setBrokerPublisherThrottlingTickTimeMillis(10000000);
+            conf.setMaxPublishRatePerTopicInMessages(1);
+            conf.setMaxPublishRatePerTopicInBytes(1);
+            conf.setBrokerPublisherThrottlingMaxMessageRate(100000);
+            conf.setBrokerPublisherThrottlingMaxByteRate(10000000);
+        } else {
+            conf.setBrokerPublisherThrottlingTickTimeMillis(1);
+            conf.setBrokerPublisherThrottlingMaxMessageRate(1);
+            conf.setBrokerPublisherThrottlingMaxByteRate(1);
+        }
+        conf.setStatsUpdateFrequencyInSecs(100000000);
+        conf.setPreciseTopicPublishRateLimiterEnable(preciseRateLimit);
+        setup();
+        String ns1 = "prop/ns-abc1" + UUID.randomUUID();
+        admin.namespaces().createNamespace(ns1, 1);
+        String topicName = "persistent://" + ns1 + "/metrics" + UUID.randomUUID();
+        String topicName2 = "persistent://" + ns1 + "/metrics" + UUID.randomUUID();
+        String topicName3 = "persistent://" + ns1 + "/metrics" + UUID.randomUUID();
+        // Use another connection
+        @Cleanup
+        PulsarClient client2 = newPulsarClient(lookupUrl.toString(), 0);
+
+        Producer<byte[]> producer = pulsarClient.newProducer().producerName("my-pub").enableBatching(false)
+                .topic(topicName).create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer().producerName("my-pub-2").enableBatching(false)
+                .topic(topicName2).create();
+        Producer<byte[]> producer3 = client2.newProducer().producerName("my-pub-2").enableBatching(false)
+                .topic(topicName3).create();
+        producer.sendAsync(new byte[11]);
+
+        PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
+                .getTopic(topicName, false).get().get();
+        Field field = AbstractTopic.class.getDeclaredField("publishRateLimitedTimes");
+        field.setAccessible(true);
+        Awaitility.await().untilAsserted(() -> {
+            long value = (long) field.get(persistentTopic);
+            assertEquals(value, 1);
+        });
+        @Cleanup
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
+        String metricsStr = statsOut.toString();
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+        assertTrue(metrics.containsKey("pulsar_publish_rate_limit_times"));
+        metrics.get("pulsar_publish_rate_limit_times").forEach(item -> {
+            if (ns1.equals(item.tags.get("namespace"))) {
+                if (item.tags.get("topic").equals(topicName)) {
+                    assertEquals(item.value, 1);
+                    return;
+                } else if (item.tags.get("topic").equals(topicName2)) {
+                    assertEquals(item.value, 1);
+                    return;
+                } else if (item.tags.get("topic").equals(topicName3)) {
+                    //When using precise rate limiting, we only trigger the rate limiting of the topic,
+                    // so if the topic is not using the same connection, the rate limiting times will be 0
+                    //When using asynchronous rate limiting, we will trigger the broker-level rate limiting,
+                    // and all connections will be limited at this time.
+                    if (preciseRateLimit) {
+                        assertEquals(item.value, 0);
+                    } else {
+                        assertEquals(item.value, 1);
+                    }
+                    return;
+                }
+                fail("should not fail");
+            }
+        });
+        // Stats updater will reset the stats
+        pulsar.getBrokerService().updateRates();
+        Awaitility.await().untilAsserted(() -> {
+            long value = (long) field.get(persistentTopic);
+            assertEquals(value, 0);
+        });
+
+        @Cleanup
+        ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut2);
+        String metricsStr2 = statsOut2.toString();
+        Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
+        assertTrue(metrics2.containsKey("pulsar_publish_rate_limit_times"));
+        metrics2.get("pulsar_publish_rate_limit_times").forEach(item -> {
+            if (ns1.equals(item.tags.get("namespace"))) {
+                assertEquals(item.value, 0);
+            }
+        });
+
+        producer.close();
+        producer2.close();
+        producer3.close();
     }
 
     @Test
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index f3b3944b5fd..de016b0cf8f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -76,6 +76,9 @@ public class TopicStatsImpl implements TopicStats {
     /** Get estimated total unconsumed or backlog size in bytes. */
     public long backlogSize;
 
+    /** The number of times the publishing rate limit was triggered. */
+    public long publishRateLimitedTimes;
+
     /** Space used to store the offloaded messages for the topic/. */
     public long offloadedStorageSize;
 
@@ -160,6 +163,7 @@ public class TopicStatsImpl implements TopicStats {
         this.lastOffloadLedgerId = 0;
         this.lastOffloadFailureTimeStamp = 0;
         this.lastOffloadSuccessTimeStamp = 0;
+        this.publishRateLimitedTimes = 0L;
         this.compaction.reset();
     }
 
@@ -182,6 +186,7 @@ public class TopicStatsImpl implements TopicStats {
         this.averageMsgSize = newAverageMsgSize;
         this.storageSize += stats.storageSize;
         this.backlogSize += stats.backlogSize;
+        this.publishRateLimitedTimes += stats.publishRateLimitedTimes;
         this.offloadedStorageSize += stats.offloadedStorageSize;
         this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
         this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;