You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/17 05:56:26 UTC

[pulsar] branch master updated: [pulsar-broker] add broker-add-latency metrics (#4290)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e32b45a  [pulsar-broker] add broker-add-latency metrics (#4290)
e32b45a is described below

commit e32b45ae8468809d33cd84823cc2e0f06f57abe7
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu May 16 22:56:21 2019 -0700

    [pulsar-broker] add broker-add-latency metrics (#4290)
---
 .../org/apache/pulsar/broker/service/Producer.java | 11 ++++-
 .../org/apache/pulsar/broker/service/Topic.java    |  7 ++++
 .../service/nonpersistent/NonPersistentTopic.java  | 13 +++++-
 .../broker/service/persistent/PersistentTopic.java | 16 ++++++-
 .../apache/pulsar/broker/stats/NamespaceStats.java | 42 ++++++++++++++++++-
 .../client/api/SimpleProducerConsumerStatTest.java | 49 ++++++++++++++++++++++
 .../pulsar/common/policies/data/TopicStats.java    |  3 +-
 7 files changed, 135 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 64b103b..fa839a6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -32,7 +32,9 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
 import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
@@ -169,7 +171,8 @@ public class Producer {
 
         startPublishOperation();
         topic.publishMessage(headersAndPayload,
-                MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize));
+                MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
+                        System.nanoTime()));
     }
 
     private boolean verifyChecksum(ByteBuf headersAndPayload) {
@@ -243,6 +246,7 @@ public class Producer {
         private Rate rateIn;
         private int msgSize;
         private long batchSize;
+        private long startTimeNs;
 
         private String originalProducerName;
         private long originalSequenceId;
@@ -304,6 +308,7 @@ public class Producer {
                 this.ledgerId = ledgerId;
                 this.entryId = entryId;
                 producer.cnx.ctx().channel().eventLoop().execute(this);
+                producer.topic.recordAddLatency(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTimeNs));
             }
         }
 
@@ -328,7 +333,7 @@ public class Producer {
         }
 
         static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize,
-                long batchSize) {
+                long batchSize, long startTimeNs) {
             MessagePublishContext callback = RECYCLER.get();
             callback.producer = producer;
             callback.sequenceId = sequenceId;
@@ -337,6 +342,7 @@ public class Producer {
             callback.batchSize = batchSize;
             callback.originalProducerName = null;
             callback.originalSequenceId = -1;
+            callback.startTimeNs = startTimeNs;
             return callback;
         }
 
@@ -360,6 +366,7 @@ public class Producer {
             ledgerId = -1;
             entryId = -1;
             batchSize = 0;
+            startTimeNs = -1;
             recyclerHandle.recycle(this);
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 00c76e2..12c9bdc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -82,6 +82,13 @@ public interface Topic {
     void addProducer(Producer producer) throws BrokerServiceException;
 
     void removeProducer(Producer producer);
+    
+    /**
+     * record add-latency in micro-seconds.
+     * 
+     * @param latencyUSec
+     */
+    void recordAddLatency(long latencyUSec);
 
     CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
             int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index dc74bea..e06e61c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.nonpersistent;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
 import com.carrotsearch.hppc.ObjectObjectHashMap;
@@ -43,6 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -127,6 +129,7 @@ public class NonPersistentTopic implements Topic {
             .newUpdater(NonPersistentTopic.class, "entriesAddedCounter");
     private volatile long entriesAddedCounter = 0;
     private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
+    private StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
 
     private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>() {
         @Override
@@ -833,7 +836,10 @@ public class NonPersistentTopic implements Topic {
         bundleStats.msgRateOut += topicStats.aggMsgRateOut;
         bundleStats.msgThroughputIn += topicStats.aggMsgThroughputIn;
         bundleStats.msgThroughputOut += topicStats.aggMsgThroughputOut;
-
+        // add publish-latency metrics
+        this.addEntryLatencyStatsUsec.refresh();
+        NamespaceStats.copy(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
+        this.addEntryLatencyStatsUsec.reset();
         // Close topic object
         topicStatsStream.endObject();
     }
@@ -1060,4 +1066,9 @@ public class NonPersistentTopic implements Topic {
                     }
                 });
     }
+    
+    @Override
+    public void recordAddLatency(long latencyUSec) {
+        addEntryLatencyStatsUsec.addValue(latencyUSec);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ee4b05f..bfbc2da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.persistent;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
@@ -55,6 +56,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.BrokerService;
@@ -186,6 +188,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         SchemaCompatibilityStrategy.FULL;
     // schema validation enforced flag
     private volatile boolean schemaValidationEnforced = false;
+    private StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
 
     private static final FastThreadLocal<TopicStatsHelper> threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>() {
         @Override
@@ -1384,7 +1387,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         topicStatsStream.writePair("msgThroughputOut", topicStatsHelper.aggMsgThroughputOut);
         topicStatsStream.writePair("storageSize", ledger.getEstimatedBacklogSize());
         topicStatsStream.writePair("pendingAddEntriesCount", ((ManagedLedgerImpl) ledger).getPendingAddEntriesCount());
-
+        
         nsStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
         nsStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
         nsStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
@@ -1399,6 +1402,11 @@ public class PersistentTopic implements Topic, AddEntryCallback {
 
         // Close topic object
         topicStatsStream.endObject();
+        
+        // add publish-latency metrics
+        this.addEntryLatencyStatsUsec.refresh();
+        NamespaceStats.copy(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
+        this.addEntryLatencyStatsUsec.reset();
     }
 
     public TopicStats getStats() {
@@ -1449,6 +1457,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
 
         stats.storageSize = ledger.getEstimatedBacklogSize();
         stats.deduplicationStatus = messageDeduplication.getStatus().toString();
+        
         return stats;
     }
 
@@ -1933,4 +1942,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {
                     }
                 });
     }
+
+    @Override
+    public void recordAddLatency(long latencyUSec) {
+        addEntryLatencyStatsUsec.addValue(latencyUSec);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/NamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/NamespaceStats.java
index 20f81df..e5c8735 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/NamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/NamespaceStats.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.stats;
 
 import java.util.Map;
 
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
 import org.apache.pulsar.common.stats.Metrics;
 
 import com.google.common.collect.Maps;
@@ -38,6 +39,26 @@ public class NamespaceStats {
     public int producerCount;
     public int replicatorCount;
     public int subsCount;
+    public static final String BRK_ADD_ENTRY_LATENCY_PREFIX = "brk_AddEntryLatencyBuckets";
+    public long[] addLatencyBucket = new long[ENTRY_LATENCY_BUCKETS_USEC.length + 1];
+    public static final String[] ADD_LATENCY_BUCKET_KEYS = new String[ENTRY_LATENCY_BUCKETS_USEC.length + 1];
+
+    static {
+        // create static ref for add-latency-bucket keys to avoid new object allocation on every stats call.
+        for (int i = 0; i < ENTRY_LATENCY_BUCKETS_USEC.length + 1; i++) {
+            String key;
+            // example of key : "<metric_key>_0.0_0.5"
+            if (i == 0 && ENTRY_LATENCY_BUCKETS_USEC.length > 0) {
+                key = String.format("%s_0.0_%1.1f", BRK_ADD_ENTRY_LATENCY_PREFIX, ENTRY_LATENCY_BUCKETS_USEC[i] / 1000.0);
+            } else if (i < ENTRY_LATENCY_BUCKETS_USEC.length) {
+                key = String.format("%s_%1.1f_%1.1f", BRK_ADD_ENTRY_LATENCY_PREFIX, ENTRY_LATENCY_BUCKETS_USEC[i - 1] / 1000.0,
+                        ENTRY_LATENCY_BUCKETS_USEC[i] / 1000.0);
+            } else {
+                key = String.format("%s_OVERFLOW", BRK_ADD_ENTRY_LATENCY_PREFIX);
+            }
+            ADD_LATENCY_BUCKET_KEYS[i] = key;
+        }
+    }
 
     public NamespaceStats() {
         reset();
@@ -56,6 +77,7 @@ public class NamespaceStats {
         this.producerCount = 0;
         this.replicatorCount = 0;
         this.subsCount = 0;
+        clear(addLatencyBucket);
     }
 
     public Metrics add(String namespace) {
@@ -75,9 +97,27 @@ public class NamespaceStats {
         dMetrics.put("brk_msg_backlog", msgBacklog);
         dMetrics.put("brk_replication_backlog", msgReplBacklog);
         dMetrics.put("brk_max_replication_delay_second", maxMsgReplDelayInSeconds);
-
+        // add add-latency metrics
+        for (int i = 0; i < this.addLatencyBucket.length; i++) {
+            dMetrics.put(ADD_LATENCY_BUCKET_KEYS[i], this.addLatencyBucket[i]);
+        }
         return dMetrics;
 
     }
 
+    public static void copy(long[] src, long[] dest) {
+        if (src != null && dest != null && src.length == dest.length) {
+            for (int i = 0; i < src.length; i++) {
+                dest[i] = src[i];
+            }
+        }
+    }
+
+    public static void clear(long[] list) {
+        if (list != null) {
+            for (int i = 0; i < list.length; i++)
+                list[i] = 0;
+        }
+    }
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 2c94966..85946c5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +45,9 @@ import org.testng.annotations.Test;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
 
 public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerStatTest.class);
@@ -372,4 +376,49 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
         }
     }
 
+    @Test
+    public void testAddBrokerLatencyStats() throws Exception {
+
+        log.info("-- Starting {} test --", methodName);
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+                .topic("persistent://my-property/tp1/my-ns/my-topic1");
+
+        Producer<byte[]> producer = producerBuilder.create();
+
+        int numMessages = 11;
+        for (int i = 0; i < numMessages; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        pulsar.getBrokerService().updateRates();
+
+        JsonArray metrics = admin.brokerStats().getMetrics();
+
+        boolean latencyCaptured = false;
+        for (int i = 0; i < metrics.size(); i++) {
+            try {
+                String data = metrics.get(i).getAsJsonObject().get("metrics").toString();
+                if (data.contains(NamespaceStats.BRK_ADD_ENTRY_LATENCY_PREFIX)) {
+                    JsonObject stat = metrics.get(i).getAsJsonObject().get("metrics").getAsJsonObject();
+                    for (String key : stat.keySet()) {
+                        if (key.startsWith(NamespaceStats.BRK_ADD_ENTRY_LATENCY_PREFIX)) {
+                            double val = stat.get(key).getAsDouble();
+                            if (val > 0.0) {
+                                latencyCaptured = true;
+                            }
+                        }
+                    }
+                    System.out.println(stat.toString());
+                }
+            } catch (Exception e) {
+                //Ok
+            }
+        }
+
+        assertTrue(latencyCaptured);
+        producer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index 4d0b1f5..e2f1802 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -57,7 +57,7 @@ public class TopicStats {
 
     /** Map of replication statistics by remote cluster context */
     public Map<String, ReplicatorStats> replication;
-
+    
     public String deduplicationStatus;
 
     public TopicStats() {
@@ -124,4 +124,5 @@ public class TopicStats {
         }
         return this;
     }
+
 }