You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/17 05:56:26 UTC
[pulsar] branch master updated: [pulsar-broker] add
broker-add-latency metrics (#4290)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e32b45a [pulsar-broker] add broker-add-latency metrics (#4290)
e32b45a is described below
commit e32b45ae8468809d33cd84823cc2e0f06f57abe7
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu May 16 22:56:21 2019 -0700
[pulsar-broker] add broker-add-latency metrics (#4290)
---
.../org/apache/pulsar/broker/service/Producer.java | 11 ++++-
.../org/apache/pulsar/broker/service/Topic.java | 7 ++++
.../service/nonpersistent/NonPersistentTopic.java | 13 +++++-
.../broker/service/persistent/PersistentTopic.java | 16 ++++++-
.../apache/pulsar/broker/stats/NamespaceStats.java | 42 ++++++++++++++++++-
.../client/api/SimpleProducerConsumerStatTest.java | 49 ++++++++++++++++++++++
.../pulsar/common/policies/data/TopicStats.java | 3 +-
7 files changed, 135 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 64b103b..fa839a6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -32,7 +32,9 @@ import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
@@ -169,7 +171,8 @@ public class Producer {
startPublishOperation();
topic.publishMessage(headersAndPayload,
- MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize));
+ MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
+ System.nanoTime()));
}
private boolean verifyChecksum(ByteBuf headersAndPayload) {
@@ -243,6 +246,7 @@ public class Producer {
private Rate rateIn;
private int msgSize;
private long batchSize;
+ private long startTimeNs;
private String originalProducerName;
private long originalSequenceId;
@@ -304,6 +308,7 @@ public class Producer {
this.ledgerId = ledgerId;
this.entryId = entryId;
producer.cnx.ctx().channel().eventLoop().execute(this);
+ producer.topic.recordAddLatency(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTimeNs));
}
}
@@ -328,7 +333,7 @@ public class Producer {
}
static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize,
- long batchSize) {
+ long batchSize, long startTimeNs) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = sequenceId;
@@ -337,6 +342,7 @@ public class Producer {
callback.batchSize = batchSize;
callback.originalProducerName = null;
callback.originalSequenceId = -1;
+ callback.startTimeNs = startTimeNs;
return callback;
}
@@ -360,6 +366,7 @@ public class Producer {
ledgerId = -1;
entryId = -1;
batchSize = 0;
+ startTimeNs = -1;
recyclerHandle.recycle(this);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 00c76e2..12c9bdc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -82,6 +82,13 @@ public interface Topic {
void addProducer(Producer producer) throws BrokerServiceException;
void removeProducer(Producer producer);
+
+ /**
+ * record add-latency in micro-seconds.
+ *
+ * @param latencyUSec
+ */
+ void recordAddLatency(long latencyUSec);
CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index dc74bea..e06e61c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.nonpersistent;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import com.carrotsearch.hppc.ObjectObjectHashMap;
@@ -43,6 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -127,6 +129,7 @@ public class NonPersistentTopic implements Topic {
.newUpdater(NonPersistentTopic.class, "entriesAddedCounter");
private volatile long entriesAddedCounter = 0;
private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
+ private StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>() {
@Override
@@ -833,7 +836,10 @@ public class NonPersistentTopic implements Topic {
bundleStats.msgRateOut += topicStats.aggMsgRateOut;
bundleStats.msgThroughputIn += topicStats.aggMsgThroughputIn;
bundleStats.msgThroughputOut += topicStats.aggMsgThroughputOut;
-
+ // add publish-latency metrics
+ this.addEntryLatencyStatsUsec.refresh();
+ NamespaceStats.copy(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
+ this.addEntryLatencyStatsUsec.reset();
// Close topic object
topicStatsStream.endObject();
}
@@ -1060,4 +1066,9 @@ public class NonPersistentTopic implements Topic {
}
});
}
+
+ @Override
+ public void recordAddLatency(long latencyUSec) {
+ addEntryLatencyStatsUsec.addValue(latencyUSec);
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ee4b05f..bfbc2da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.persistent;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
@@ -55,6 +56,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerService;
@@ -186,6 +188,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
SchemaCompatibilityStrategy.FULL;
// schema validation enforced flag
private volatile boolean schemaValidationEnforced = false;
+ private StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
private static final FastThreadLocal<TopicStatsHelper> threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>() {
@Override
@@ -1384,7 +1387,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
topicStatsStream.writePair("msgThroughputOut", topicStatsHelper.aggMsgThroughputOut);
topicStatsStream.writePair("storageSize", ledger.getEstimatedBacklogSize());
topicStatsStream.writePair("pendingAddEntriesCount", ((ManagedLedgerImpl) ledger).getPendingAddEntriesCount());
-
+
nsStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
nsStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
nsStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
@@ -1399,6 +1402,11 @@ public class PersistentTopic implements Topic, AddEntryCallback {
// Close topic object
topicStatsStream.endObject();
+
+ // add publish-latency metrics
+ this.addEntryLatencyStatsUsec.refresh();
+ NamespaceStats.copy(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
+ this.addEntryLatencyStatsUsec.reset();
}
public TopicStats getStats() {
@@ -1449,6 +1457,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
stats.storageSize = ledger.getEstimatedBacklogSize();
stats.deduplicationStatus = messageDeduplication.getStatus().toString();
+
return stats;
}
@@ -1933,4 +1942,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {
}
});
}
+
+ @Override
+ public void recordAddLatency(long latencyUSec) {
+ addEntryLatencyStatsUsec.addValue(latencyUSec);
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/NamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/NamespaceStats.java
index 20f81df..e5c8735 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/NamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/NamespaceStats.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.stats;
import java.util.Map;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import org.apache.pulsar.common.stats.Metrics;
import com.google.common.collect.Maps;
@@ -38,6 +39,26 @@ public class NamespaceStats {
public int producerCount;
public int replicatorCount;
public int subsCount;
+ public static final String BRK_ADD_ENTRY_LATENCY_PREFIX = "brk_AddEntryLatencyBuckets";
+ public long[] addLatencyBucket = new long[ENTRY_LATENCY_BUCKETS_USEC.length + 1];
+ public static final String[] ADD_LATENCY_BUCKET_KEYS = new String[ENTRY_LATENCY_BUCKETS_USEC.length + 1];
+
+ static {
+ // create static ref for add-latency-bucket keys to avoid new object allocation on every stats call.
+ for (int i = 0; i < ENTRY_LATENCY_BUCKETS_USEC.length + 1; i++) {
+ String key;
+ // example of key : "<metric_key>_0.0_0.5"
+ if (i == 0 && ENTRY_LATENCY_BUCKETS_USEC.length > 0) {
+ key = String.format("%s_0.0_%1.1f", BRK_ADD_ENTRY_LATENCY_PREFIX, ENTRY_LATENCY_BUCKETS_USEC[i] / 1000.0);
+ } else if (i < ENTRY_LATENCY_BUCKETS_USEC.length) {
+ key = String.format("%s_%1.1f_%1.1f", BRK_ADD_ENTRY_LATENCY_PREFIX, ENTRY_LATENCY_BUCKETS_USEC[i - 1] / 1000.0,
+ ENTRY_LATENCY_BUCKETS_USEC[i] / 1000.0);
+ } else {
+ key = String.format("%s_OVERFLOW", BRK_ADD_ENTRY_LATENCY_PREFIX);
+ }
+ ADD_LATENCY_BUCKET_KEYS[i] = key;
+ }
+ }
public NamespaceStats() {
reset();
@@ -56,6 +77,7 @@ public class NamespaceStats {
this.producerCount = 0;
this.replicatorCount = 0;
this.subsCount = 0;
+ clear(addLatencyBucket);
}
public Metrics add(String namespace) {
@@ -75,9 +97,27 @@ public class NamespaceStats {
dMetrics.put("brk_msg_backlog", msgBacklog);
dMetrics.put("brk_replication_backlog", msgReplBacklog);
dMetrics.put("brk_max_replication_delay_second", maxMsgReplDelayInSeconds);
-
+ // add add-latency metrics
+ for (int i = 0; i < this.addLatencyBucket.length; i++) {
+ dMetrics.put(ADD_LATENCY_BUCKET_KEYS[i], this.addLatencyBucket[i]);
+ }
return dMetrics;
}
+ public static void copy(long[] src, long[] dest) {
+ if (src != null && dest != null && src.length == dest.length) {
+ for (int i = 0; i < src.length; i++) {
+ dest[i] = src[i];
+ }
+ }
+ }
+
+ public static void clear(long[] list) {
+ if (list != null) {
+ for (int i = 0; i < list.length; i++)
+ list[i] = 0;
+ }
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 2c94966..85946c5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +45,9 @@ import org.testng.annotations.Test;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerStatTest.class);
@@ -372,4 +376,49 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
}
}
+ @Test
+ public void testAddBrokerLatencyStats() throws Exception {
+
+ log.info("-- Starting {} test --", methodName);
+
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+ .topic("persistent://my-property/tp1/my-ns/my-topic1");
+
+ Producer<byte[]> producer = producerBuilder.create();
+
+ int numMessages = 11;
+ for (int i = 0; i < numMessages; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ pulsar.getBrokerService().updateRates();
+
+ JsonArray metrics = admin.brokerStats().getMetrics();
+
+ boolean latencyCaptured = false;
+ for (int i = 0; i < metrics.size(); i++) {
+ try {
+ String data = metrics.get(i).getAsJsonObject().get("metrics").toString();
+ if (data.contains(NamespaceStats.BRK_ADD_ENTRY_LATENCY_PREFIX)) {
+ JsonObject stat = metrics.get(i).getAsJsonObject().get("metrics").getAsJsonObject();
+ for (String key : stat.keySet()) {
+ if (key.startsWith(NamespaceStats.BRK_ADD_ENTRY_LATENCY_PREFIX)) {
+ double val = stat.get(key).getAsDouble();
+ if (val > 0.0) {
+ latencyCaptured = true;
+ }
+ }
+ }
+ System.out.println(stat.toString());
+ }
+ } catch (Exception e) {
+ //Ok
+ }
+ }
+
+ assertTrue(latencyCaptured);
+ producer.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index 4d0b1f5..e2f1802 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -57,7 +57,7 @@ public class TopicStats {
/** Map of replication statistics by remote cluster context */
public Map<String, ReplicatorStats> replication;
-
+
public String deduplicationStatus;
public TopicStats() {
@@ -124,4 +124,5 @@ public class TopicStats {
}
return this;
}
+
}