You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/05/29 02:09:40 UTC
[pulsar] branch branch-2.9 updated: [branch-2.9] Add publishRateLimitedTimes to topic metrics. (#15739)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 15fdf15ff6f [branch-2.9] Add publishRateLimitedTimes to topic metrics. (#15739)
15fdf15ff6f is described below
commit 15fdf15ff6f05e514b88143446da9439a7bab17b
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Sun May 29 10:09:30 2022 +0800
[branch-2.9] Add publishRateLimitedTimes to topic metrics. (#15739)
---
.../pulsar/broker/service/AbstractTopic.java | 9 ++
.../apache/pulsar/broker/service/ServerCnx.java | 13 +++
.../org/apache/pulsar/broker/service/Topic.java | 5 +
.../broker/service/persistent/PersistentTopic.java | 3 +-
.../stats/prometheus/NamespaceStatsAggregator.java | 1 +
.../pulsar/broker/stats/prometheus/TopicStats.java | 5 +
.../pulsar/broker/stats/PrometheusMetricsTest.java | 105 ++++++++++++++++++++-
.../common/policies/data/stats/TopicStatsImpl.java | 5 +
8 files changed, 143 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index fbdbd611d50..69c51c77d44 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -116,6 +116,10 @@ public abstract class AbstractTopic implements Topic {
private LongAdder bytesInCounter = new LongAdder();
private LongAdder msgInCounter = new LongAdder();
+ private static final AtomicLongFieldUpdater<AbstractTopic> RATE_LIMITED_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "publishRateLimitedTimes");
+ protected volatile long publishRateLimitedTimes = 0;
+
protected volatile Optional<Long> topicEpoch = Optional.empty();
private volatile boolean hasExclusiveProducer;
// pointer to the exclusive producer
@@ -634,6 +638,11 @@ public abstract class AbstractTopic implements Topic {
}
}
+ @Override
+ public long increasePublishLimitedTimes() {
+ return RATE_LIMITED_UPDATER.incrementAndGet(this);
+ }
+
protected void internalAddProducer(Producer producer) throws BrokerServiceException {
if (isProducersExceeded()) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 3c420fe43b3..58035b84c38 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2512,6 +2512,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
// When the quota of pending send requests is reached, stop reading from socket to cause backpressure on
// client connection, possibly shared between multiple producers
ctx.channel().config().setAutoRead(false);
+ recordRateLimitMetrics(producers);
autoReadDisabledRateLimiting = isPublishRateExceeded;
throttledConnections.inc();
}
@@ -2533,6 +2534,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
}
+ private void recordRateLimitMetrics(ConcurrentLongHashMap<CompletableFuture<Producer>> producers) {
+ producers.forEach((key, producerFuture) -> {
+ if (producerFuture != null && producerFuture.isDone()) {
+ Producer p = producerFuture.getNow(null);
+ if (p != null && p.getTopic() != null) {
+ p.getTopic().increasePublishLimitedTimes();
+ }
+ }
+ });
+ }
+
@Override
public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
if (pendingBytesPerThread.get().addAndGet(-msgSize) < resumeThresholdPendingBytesPerThread
@@ -2577,6 +2589,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
public void disableCnxAutoRead() {
if (ctx != null && ctx.channel().config().isAutoRead()) {
ctx.channel().config().setAutoRead(false);
+ recordRateLimitMetrics(producers);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 9dd945f78da..14e8e3fe004 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -129,6 +129,11 @@ public interface Topic {
*/
void recordAddLatency(long latency, TimeUnit unit);
+ /**
+ * increase the publishing limited times.
+ */
+ long increasePublishLimitedTimes();
+
CompletableFuture<Consumer> subscribe(TransportCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName, boolean isDurable,
MessageId startMessageId,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index db824b0706b..22531e8ed09 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1682,7 +1682,7 @@ public class PersistentTopic extends AbstractTopic
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats,
StatsOutputStream topicStatsStream,
ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
-
+ this.publishRateLimitedTimes = 0;
TopicStatsHelper topicStatsHelper = threadLocalTopicStats.get();
topicStatsHelper.reset();
@@ -1931,6 +1931,7 @@ public class PersistentTopic extends AbstractTopic
stats.waitingPublishers = getWaitingProducersCount();
stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();
+ stats.publishRateLimitedTimes = publishRateLimitedTimes;
subscriptions.forEach((name, subscription) -> {
SubscriptionStatsImpl subStats = subscription.getStats(getPreciseBacklog, subscriptionBacklogSize);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 9df9ad4c7c6..2025c59c1a1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -148,6 +148,7 @@ public class NamespaceStatsAggregator {
stats.msgOutCounter = tStatus.msgOutCounter;
stats.bytesOutCounter = tStatus.bytesOutCounter;
stats.averageMsgSize = tStatus.averageMsgSize;
+ stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
stats.producersCount = 0;
topic.getProducers().values().forEach(producer -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index bfa427e5e3a..599c3d82f6f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -44,6 +44,8 @@ class TopicStats {
public long msgBacklog;
+ long publishRateLimitedTimes;
+
long backlogQuotaLimit;
long backlogQuotaLimitTime;
@@ -82,6 +84,7 @@ class TopicStats {
managedLedgerStats.reset();
msgBacklog = 0;
+ publishRateLimitedTimes = 0L;
backlogQuotaLimit = 0;
backlogQuotaLimitTime = -1;
@@ -133,6 +136,8 @@ class TopicStats {
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size",
stats.managedLedgerStats.backlogSize, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
+ splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_offloaded_size", stats.managedLedgerStats
.offloadedStorageUsed, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index b4094e488f3..1304d5c7ae2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -55,12 +55,12 @@ import java.util.regex.Pattern;
import javax.crypto.SecretKey;
import javax.naming.AuthenticationException;
import lombok.Cleanup;
-import org.apache.bookkeeper.client.BookKeeper;
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
@@ -74,7 +74,6 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.compaction.Compactor;
-import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -95,6 +94,108 @@ public class PrometheusMetricsTest extends BrokerTestBase {
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
+ resetConfig();
+ }
+
+ @Test
+ public void testPublishRateLimitedTimes() throws Exception {
+ cleanup();
+ checkPublishRateLimitedTimes(true);
+ cleanup();
+ checkPublishRateLimitedTimes(false);
+ }
+
+ private void checkPublishRateLimitedTimes(boolean preciseRateLimit) throws Exception {
+ if (preciseRateLimit) {
+ conf.setBrokerPublisherThrottlingTickTimeMillis(10000000);
+ conf.setMaxPublishRatePerTopicInMessages(1);
+ conf.setMaxPublishRatePerTopicInBytes(1);
+ conf.setBrokerPublisherThrottlingMaxMessageRate(100000);
+ conf.setBrokerPublisherThrottlingMaxByteRate(10000000);
+ } else {
+ conf.setBrokerPublisherThrottlingTickTimeMillis(1);
+ conf.setBrokerPublisherThrottlingMaxMessageRate(1);
+ conf.setBrokerPublisherThrottlingMaxByteRate(1);
+ }
+ conf.setStatsUpdateFrequencyInSecs(100000000);
+ conf.setPreciseTopicPublishRateLimiterEnable(preciseRateLimit);
+ setup();
+ String ns1 = "prop/ns-abc1" + UUID.randomUUID();
+ admin.namespaces().createNamespace(ns1, 1);
+ String topicName = "persistent://" + ns1 + "/metrics" + UUID.randomUUID();
+ String topicName2 = "persistent://" + ns1 + "/metrics" + UUID.randomUUID();
+ String topicName3 = "persistent://" + ns1 + "/metrics" + UUID.randomUUID();
+ // Use another connection
+ @Cleanup
+ PulsarClient client2 = newPulsarClient(lookupUrl.toString(), 0);
+
+ Producer<byte[]> producer = pulsarClient.newProducer().producerName("my-pub").enableBatching(false)
+ .topic(topicName).create();
+ Producer<byte[]> producer2 = pulsarClient.newProducer().producerName("my-pub-2").enableBatching(false)
+ .topic(topicName2).create();
+ Producer<byte[]> producer3 = client2.newProducer().producerName("my-pub-2").enableBatching(false)
+ .topic(topicName3).create();
+ producer.sendAsync(new byte[11]);
+
+ PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
+ .getTopic(topicName, false).get().get();
+ Field field = AbstractTopic.class.getDeclaredField("publishRateLimitedTimes");
+ field.setAccessible(true);
+ Awaitility.await().untilAsserted(() -> {
+ long value = (long) field.get(persistentTopic);
+ assertEquals(value, 1);
+ });
+ @Cleanup
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
+ String metricsStr = statsOut.toString();
+ Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+ assertTrue(metrics.containsKey("pulsar_publish_rate_limit_times"));
+ metrics.get("pulsar_publish_rate_limit_times").forEach(item -> {
+ if (ns1.equals(item.tags.get("namespace"))) {
+ if (item.tags.get("topic").equals(topicName)) {
+ assertEquals(item.value, 1);
+ return;
+ } else if (item.tags.get("topic").equals(topicName2)) {
+ assertEquals(item.value, 1);
+ return;
+ } else if (item.tags.get("topic").equals(topicName3)) {
+ //When using precise rate limiting, we only trigger the rate limiting of the topic,
+ // so if the topic is not using the same connection, the rate limiting times will be 0
+ //When using asynchronous rate limiting, we will trigger the broker-level rate limiting,
+ // and all connections will be limited at this time.
+ if (preciseRateLimit) {
+ assertEquals(item.value, 0);
+ } else {
+ assertEquals(item.value, 1);
+ }
+ return;
+ }
+ fail("should not fail");
+ }
+ });
+ // Stats updater will reset the stats
+ pulsar.getBrokerService().updateRates();
+ Awaitility.await().untilAsserted(() -> {
+ long value = (long) field.get(persistentTopic);
+ assertEquals(value, 0);
+ });
+
+ @Cleanup
+ ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut2);
+ String metricsStr2 = statsOut2.toString();
+ Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
+ assertTrue(metrics2.containsKey("pulsar_publish_rate_limit_times"));
+ metrics2.get("pulsar_publish_rate_limit_times").forEach(item -> {
+ if (ns1.equals(item.tags.get("namespace"))) {
+ assertEquals(item.value, 0);
+ }
+ });
+
+ producer.close();
+ producer2.close();
+ producer3.close();
}
@Test
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index f3b3944b5fd..de016b0cf8f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -76,6 +76,9 @@ public class TopicStatsImpl implements TopicStats {
/** Get estimated total unconsumed or backlog size in bytes. */
public long backlogSize;
+ /** The number of times the publishing rate limit was triggered. */
+ public long publishRateLimitedTimes;
+
/** Space used to store the offloaded messages for the topic/. */
public long offloadedStorageSize;
@@ -160,6 +163,7 @@ public class TopicStatsImpl implements TopicStats {
this.lastOffloadLedgerId = 0;
this.lastOffloadFailureTimeStamp = 0;
this.lastOffloadSuccessTimeStamp = 0;
+ this.publishRateLimitedTimes = 0L;
this.compaction.reset();
}
@@ -182,6 +186,7 @@ public class TopicStatsImpl implements TopicStats {
this.averageMsgSize = newAverageMsgSize;
this.storageSize += stats.storageSize;
this.backlogSize += stats.backlogSize;
+ this.publishRateLimitedTimes += stats.publishRateLimitedTimes;
this.offloadedStorageSize += stats.offloadedStorageSize;
this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;