You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/05 23:44:28 UTC

[incubator-pulsar] branch master updated: Improved API for producer/consumer stats (#1321)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 97c2b44  Improved API for producer/consumer stats (#1321)
97c2b44 is described below

commit 97c2b44a061221784c453fc6ec31fc1ec1718878
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Mar 5 15:44:27 2018 -0800

    Improved API for producer/consumer stats (#1321)
    
    * Improved API for producer/consumer stats
    
    * Fixed compilation after merge
---
 .../client/api/SimpleProducerConsumerStatTest.java |   2 -
 .../client/impl/PatternTopicsConsumerImplTest.java |   3 +-
 .../org/apache/pulsar/client/api/Consumer.java     |  29 +++--
 .../apache/pulsar/client/api/ConsumerStats.java    |  90 ++++++++++++++
 .../org/apache/pulsar/client/api/Producer.java     |  19 +--
 .../apache/pulsar/client/api/ProducerStats.java    | 110 +++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  28 ++---
 .../pulsar/client/impl/ConsumerStatsDisabled.java  |  91 +++++++++++++-
 ...atsDisabled.java => ConsumerStatsRecorder.java} |  32 +++--
 ...erStats.java => ConsumerStatsRecorderImpl.java} |  81 ++++++++-----
 .../client/impl/PartitionedConsumerImpl.java       |   6 +-
 .../client/impl/PartitionedProducerImpl.java       |   6 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  11 +-
 .../pulsar/client/impl/ProducerStatsDisabled.java  |  96 ++++++++++++++-
 ...atsDisabled.java => ProducerStatsRecorder.java} |  25 ++--
 ...erStats.java => ProducerStatsRecorderImpl.java} | 131 ++++++++++++++-------
 .../pulsar/client/impl/TopicsConsumerImpl.java     |  15 +--
 17 files changed, 599 insertions(+), 176 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 30a2c25..8a536f2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -34,8 +34,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.impl.ConsumerStats;
-import org.apache.pulsar.client.impl.ProducerStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 10367b8..4f9cc70 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -163,8 +163,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
         // 4. verify consumer get methods, to get right number of partitions and topics.
         assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern());
         List<String> topics = ((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics();
-        @SuppressWarnings("rawtypes")
-        List<ConsumerImpl> consumers = ((PatternTopicsConsumerImpl<?>) consumer).getConsumers();
+        List<ConsumerImpl<byte[]>> consumers = ((PatternTopicsConsumerImpl<byte[]>) consumer).getConsumers();
 
         assertEquals(topics.size(), 6);
         assertEquals(consumers.size(), 6);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
index a1dd628..876b6d2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -22,12 +22,8 @@ import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.pulsar.client.impl.ConsumerStats;
-
 /**
  * An interface that abstracts behavior of Pulsar's consumer.
- *
- *
  */
 public interface Consumer<T> extends Closeable {
 
@@ -201,16 +197,22 @@ public interface Consumer<T> extends Closeable {
     CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId);
 
     /**
-     * Get statistics for the consumer
+     * Get statistics for the consumer.
      *
-     * numMsgsReceived : Number of messages received in the current interval numBytesReceived : Number of bytes received
-     * in the current interval numReceiveFailed : Number of messages failed to receive in the current interval
-     * numAcksSent : Number of acks sent in the current interval numAcksFailed : Number of acks failed to send in the
-     * current interval totalMsgsReceived : Total number of messages received totalBytesReceived : Total number of bytes
-     * received totalReceiveFailed : Total number of messages failed to receive totalAcksSent : Total number of acks
-     * sent totalAcksFailed : Total number of acks failed to sent
+     * <ul>
+     * <li>numMsgsReceived : Number of messages received in the current interval
+     * <li>numBytesReceived : Number of bytes received in the current interval
+     * <li>numReceiveFailed : Number of messages failed to receive in the current interval
+     * <li>numAcksSent : Number of acks sent in the current interval
+     * <li>numAcksFailed : Number of acks failed to send in the current interval
+     * <li>totalMsgsReceived : Total number of messages received
+     * <li>totalBytesReceived : Total number of bytes received
+     * <li>totalReceiveFailed : Total number of messages failed to receive
+     * <li>totalAcksSent : Total number of acks sent
+     * <li>totalAcksFailed : Total number of acks failed to sent
+     * </ul>
      *
-     * @return statistic for the consumer or null if ConsumerStats is disabled.
+     * @return statistic for the consumer
      */
     ConsumerStats getStats();
 
@@ -229,6 +231,9 @@ public interface Consumer<T> extends Closeable {
 
     /**
      * Return true if the topic was terminated and this consumer has already consumed all the messages in the topic.
+     *
+     * Please note that this does not simply mean that the consumer is caught up with the last message published by
+     * producers, rather the topic needs to be explicitly "terminated".
      */
     boolean hasReachedEndOfTopic();
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
new file mode 100644
index 0000000..5901c2d
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+
+/**
+ * Consumer statistics recorded by client.
+ *
+ * All the stats are relative to the last recording period. The interval of the stats refreshes is configured with
+ * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute.
+ */
+public interface ConsumerStats extends Serializable {
+
+    /**
+     * @return Number of messages received in the last interval
+     */
+    long getNumMsgsReceived();
+
+    /**
+     * @return Number of bytes received in the last interval
+     */
+    long getNumBytesReceived();
+
+    /**
+     * @return Rate of bytes per second received in the last interval
+     */
+    double getRateMsgsReceived();
+
+    /**
+     * @return Rate of bytes per second received in the last interval
+     */
+    double getRateBytesReceived();
+
+    /**
+     * @return Number of message acknowledgments sent in the last interval
+     */
+    long getNumAcksSent();
+
+    /**
+     * @return Number of message acknowledgments failed in the last interval
+     */
+    long getNumAcksFailed();
+
+    /**
+     * @return Number of message receive failed in the last interval
+     */
+    long getNumReceiveFailed();
+
+    /**
+     * @return Total number of messages received by this consumer
+     */
+    long getTotalMsgsReceived();
+
+    /**
+     * @return Total number of bytes received by this consumer
+     */
+    long getTotalBytesReceived();
+
+    /**
+     * @return Total number of messages receive failures
+     */
+    long getTotalReceivedFailed();
+
+    /**
+     * @return Total number of message acknowledgments sent by this consumer
+     */
+    long getTotalAcksSent();
+
+    /**
+     * @return Total number of message acknowledgments failures on this consumer
+     */
+    long getTotalAcksFailed();
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java
index fe60238..97c5c4e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java
@@ -21,8 +21,6 @@ package org.apache.pulsar.client.api;
 import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
 
-import org.apache.pulsar.client.impl.ProducerStats;
-
 /**
  * Producer object.
  *
@@ -124,13 +122,18 @@ public interface Producer<T> extends Closeable {
     /**
      * Get statistics for the producer
      *
-     * numMsgsSent : Number of messages sent in the current interval numBytesSent : Number of bytes sent in the current
-     * interval numSendFailed : Number of messages failed to send in the current interval numAcksReceived : Number of
-     * acks received in the current interval totalMsgsSent : Total number of messages sent totalBytesSent : Total number
-     * of bytes sent totalSendFailed : Total number of messages failed to send totalAcksReceived: Total number of acks
-     * received
+     * <ul>
+     * <li>numMsgsSent : Number of messages sent in the current interval
+     * <li>numBytesSent : Number of bytes sent in the current interval
+     * <li>numSendFailed : Number of messages failed to send in the current interval
+     * <li>numAcksReceived : Number of acks received in the current interval
+     * <li>totalMsgsSent : Total number of messages sent
+     * <li>totalBytesSent : Total number of bytes sent
+     * <li>totalSendFailed : Total number of messages failed to send
+     * <li>totalAcksReceived: Total number of acks received
+     * </ul>
      *
-     * @return statistic for the producer or null if ProducerStats is disabled.
+     * @return statistic for the producer or null if ProducerStatsRecorderImpl is disabled.
      */
     ProducerStats getStats();
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
new file mode 100644
index 0000000..552c61a
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+
+/**
+ * Producer statistics recorded by client.
+ *
+ * All the stats are relative to the last recording period. The interval of the stats refreshes is configured with
+ * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute.
+ */
+public interface ProducerStats extends Serializable {
+    /**
+     * @return the number of messages published in the last interval
+     */
+    long getNumMsgsSent();
+
+    /**
+     * @return the number of bytes sent in the last interval
+     */
+    long getNumBytesSent();
+
+    /**
+     * @return the number of failed send operations in the last interval
+     */
+    long getNumSendFailed();
+
+    /**
+     * @return the number of send acknowledges received by broker in the last interval
+     */
+    long getNumAcksReceived();
+
+    /**
+     * @return the messages send rate in the last interval
+     */
+    double getSendMsgsRate();
+
+    /**
+     * @return the bytes send rate in the last interval
+     */
+    double getSendBytesRate();
+
+    /**
+     * @return the 50th percentile of the send latency in milliseconds for the last interval
+     */
+    double getSendLatencyMillis50pct();
+
+    /**
+     * @return the 75th percentile of the send latency in milliseconds for the last interval
+     */
+    double getSendLatencyMillis75pct();
+
+    /**
+     * @return the 95th percentile of the send latency in milliseconds for the last interval
+     */
+    double getSendLatencyMillis95pct();
+
+    /**
+     * @return the 99th percentile of the send latency in milliseconds for the last interval
+     */
+    double getSendLatencyMillis99pct();
+
+    /**
+     * @return the 99.9th percentile of the send latency in milliseconds for the last interval
+     */
+    double getSendLatencyMillis999pct();
+
+    /**
+     * @return the max send latency in milliseconds for the last interval
+     */
+    double getSendLatencyMillisMax();
+
+    /**
+     * @return the total number of messages published by this producer
+     */
+    long getTotalMsgsSent();
+
+    /**
+     * @return the total number of bytes sent by this producer
+     */
+    long getTotalBytesSent();
+
+    /**
+     * @return the total number of failed send operations
+     */
+    long getTotalSendFailed();
+
+    /**
+     * @return the total number of send acknowledges received by broker
+     */
+    long getTotalAcksReceived();
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 183651a..ab812a0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,11 +25,6 @@ import static java.lang.String.format;
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
-import com.google.common.collect.Iterables;
-import io.netty.buffer.ByteBuf;
-import io.netty.util.Timeout;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -52,8 +47,10 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
+
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -75,6 +72,13 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Iterables;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Timeout;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
 public class ConsumerImpl<T> extends ConsumerBase<T> {
     private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
 
@@ -106,7 +110,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
     private final UnAckedMessageTracker unAckedMessageTracker;
     private final ConcurrentNavigableMap<MessageIdImpl, BitSet> batchMessageAckTracker;
 
-    protected final ConsumerStats stats;
+    protected final ConsumerStatsRecorder stats;
     private final int priorityLevel;
     private final SubscriptionMode subscriptionMode;
     private BatchMessageIdImpl startMessageId;
@@ -150,9 +154,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
         this.readCompacted = conf.isReadCompacted();
 
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
-            stats = new ConsumerStats(client, conf, this);
+            stats = new ConsumerStatsRecorderImpl(client, conf, this);
         } else {
-            stats = ConsumerStats.CONSUMER_STATS_DISABLED;
+            stats = ConsumerStatsDisabled.INSTANCE;
         }
 
         if (conf.getReceiverQueueSize() <= 1) {
@@ -698,10 +702,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
             return CompletableFuture.completedFuture(null);
         }
 
-        Timeout timeout = stats.getStatTimeout();
-        if (timeout != null) {
-            timeout.cancel();
-        }
+        stats.getStatTimeout().ifPresent(Timeout::cancel);
 
         setState(State.Closing);
 
@@ -1408,9 +1409,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
 
     @Override
     public ConsumerStats getStats() {
-        if (stats instanceof ConsumerStatsDisabled) {
-            return null;
-        }
         return stats;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
index 08b189c..717acfa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
@@ -18,29 +18,110 @@
  */
 package org.apache.pulsar.client.impl;
 
+import java.util.Optional;
+
+import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
 
-public class ConsumerStatsDisabled extends ConsumerStats {
+import io.netty.util.Timeout;
+
+public class ConsumerStatsDisabled implements ConsumerStatsRecorder {
     private static final long serialVersionUID = 1L;
 
+    static final ConsumerStatsRecorder INSTANCE = new ConsumerStatsDisabled();
+
     @Override
-    void updateNumMsgsReceived(Message<?> message) {
+    public void updateNumMsgsReceived(Message<?> message) {
         // Do nothing
     }
 
     @Override
-    void incrementNumReceiveFailed() {
+    public void incrementNumReceiveFailed() {
         // Do nothing
     }
 
     @Override
-    void incrementNumAcksSent(long numAcks) {
+    public void incrementNumAcksSent(long numAcks) {
         // Do nothing
     }
 
     @Override
-    void incrementNumAcksFailed() {
+    public void incrementNumAcksFailed() {
         // Do nothing
     }
 
+    @Override
+    public long getNumMsgsReceived() {
+        return 0;
+    }
+
+    @Override
+    public long getNumBytesReceived() {
+        return 0;
+    }
+
+    @Override
+    public long getNumAcksSent() {
+        return 0;
+    }
+
+    @Override
+    public long getNumAcksFailed() {
+        return 0;
+    }
+
+    @Override
+    public long getNumReceiveFailed() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalMsgsReceived() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalBytesReceived() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalReceivedFailed() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalAcksSent() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalAcksFailed() {
+        return 0;
+    }
+
+    @Override
+    public double getRateMsgsReceived() {
+        return 0;
+    }
+
+    @Override
+    public double getRateBytesReceived() {
+        return 0;
+    }
+
+    @Override
+    public Optional<Timeout> getStatTimeout() {
+        return Optional.empty();
+    }
+
+    @Override
+    public void reset() {
+        // do nothing
+    }
+
+    @Override
+    public void updateCumulativeStats(ConsumerStats stats) {
+        // do nothing
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
similarity index 66%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
index 08b189c..f29ca09 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
@@ -18,29 +18,25 @@
  */
 package org.apache.pulsar.client.impl;
 
+import java.util.Optional;
+
+import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
 
-public class ConsumerStatsDisabled extends ConsumerStats {
-    private static final long serialVersionUID = 1L;
+import io.netty.util.Timeout;
+
+public interface ConsumerStatsRecorder extends ConsumerStats {
+    void updateNumMsgsReceived(Message<?> message);
+
+    void incrementNumAcksSent(long numAcks);
 
-    @Override
-    void updateNumMsgsReceived(Message<?> message) {
-        // Do nothing
-    }
+    void incrementNumAcksFailed();
 
-    @Override
-    void incrementNumReceiveFailed() {
-        // Do nothing
-    }
+    void incrementNumReceiveFailed();
 
-    @Override
-    void incrementNumAcksSent(long numAcks) {
-        // Do nothing
-    }
+    Optional<Timeout> getStatTimeout();
 
-    @Override
-    void incrementNumAcksFailed() {
-        // Do nothing
-    }
+    void reset();
 
+    void updateCumulativeStats(ConsumerStats stats);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
similarity index 76%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
index 21ff3c4..01c51cc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
@@ -19,11 +19,12 @@
 package org.apache.pulsar.client.impl;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.text.DecimalFormat;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 
+import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.slf4j.Logger;
@@ -36,7 +37,7 @@ import com.fasterxml.jackson.databind.SerializationFeature;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 
-public class ConsumerStats implements Serializable {
+public class ConsumerStatsRecorderImpl implements ConsumerStatsRecorder {
 
     private static final long serialVersionUID = 1L;
     private TimerTask stat;
@@ -56,11 +57,12 @@ public class ConsumerStats implements Serializable {
     private final LongAdder totalAcksSent;
     private final LongAdder totalAcksFailed;
 
-    private final DecimalFormat throughputFormat;
+    private volatile double receivedMsgsRate;
+    private volatile double receivedBytesRate;
 
-    public static final ConsumerStats CONSUMER_STATS_DISABLED = new ConsumerStatsDisabled();
+    private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00");
 
-    public ConsumerStats() {
+    public ConsumerStatsRecorderImpl() {
         numMsgsReceived = null;
         numBytesReceived = null;
         numReceiveFailed = null;
@@ -71,10 +73,10 @@ public class ConsumerStats implements Serializable {
         totalReceiveFailed = null;
         totalAcksSent = null;
         totalAcksFailed = null;
-        throughputFormat = null;
     }
 
-    public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfigurationData<?> conf, ConsumerImpl<?> consumer) {
+    public ConsumerStatsRecorderImpl(PulsarClientImpl pulsarClient, ConsumerConfigurationData<?> conf,
+            ConsumerImpl<?> consumer) {
         this.pulsarClient = pulsarClient;
         this.consumer = consumer;
         this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
@@ -88,7 +90,6 @@ public class ConsumerStats implements Serializable {
         totalReceiveFailed = new LongAdder();
         totalAcksSent = new LongAdder();
         totalAcksFailed = new LongAdder();
-        throughputFormat = new DecimalFormat("0.00");
         init(conf);
     }
 
@@ -124,6 +125,9 @@ public class ConsumerStats implements Serializable {
                 totalAcksSent.add(currentNumAcksSent);
                 totalAcksFailed.add(currentNumAcksFailed);
 
+                receivedMsgsRate = currentNumMsgsReceived / elapsed;
+                receivedBytesRate = currentNumBytesReceived / elapsed;
+
                 if ((currentNumMsgsReceived | currentNumBytesReceived | currentNumReceiveFailed | currentNumAcksSent
                         | currentNumAcksFailed) != 0) {
                     log.info(
@@ -131,9 +135,9 @@ public class ConsumerStats implements Serializable {
                                     + "Throughput received: {} msg/s --- {} Mbit/s --- "
                                     + "Ack sent rate: {} ack/s --- " + "Failed messages: {} --- " + "Failed acks: {}",
                             consumer.getTopic(), consumer.getSubscription(), consumer.consumerName,
-                            consumer.incomingMessages.size(), throughputFormat.format(currentNumMsgsReceived / elapsed),
-                            throughputFormat.format(currentNumBytesReceived / elapsed * 8 / 1024 / 1024),
-                            throughputFormat.format(currentNumAcksSent / elapsed), currentNumReceiveFailed,
+                            consumer.incomingMessages.size(), THROUGHPUT_FORMAT.format(receivedMsgsRate),
+                            THROUGHPUT_FORMAT.format(receivedBytesRate * 8 / 1024 / 1024),
+                            THROUGHPUT_FORMAT.format(currentNumAcksSent / elapsed), currentNumReceiveFailed,
                             currentNumAcksFailed);
                 }
             } catch (Exception e) {
@@ -149,30 +153,36 @@ public class ConsumerStats implements Serializable {
         statTimeout = pulsarClient.timer().newTimeout(stat, statsIntervalSeconds, TimeUnit.SECONDS);
     }
 
-    void updateNumMsgsReceived(Message<?> message) {
+    @Override
+    public void updateNumMsgsReceived(Message<?> message) {
         if (message != null) {
             numMsgsReceived.increment();
             numBytesReceived.add(message.getData().length);
         }
     }
 
-    void incrementNumAcksSent(long numAcks) {
+    @Override
+    public void incrementNumAcksSent(long numAcks) {
         numAcksSent.add(numAcks);
     }
 
-    void incrementNumAcksFailed() {
+    @Override
+    public void incrementNumAcksFailed() {
         numAcksFailed.increment();
     }
 
-    void incrementNumReceiveFailed() {
+    @Override
+    public void incrementNumReceiveFailed() {
         numReceiveFailed.increment();
     }
 
-    Timeout getStatTimeout() {
-        return statTimeout;
+    @Override
+    public Optional<Timeout> getStatTimeout() {
+        return Optional.ofNullable(statTimeout);
     }
 
-    void reset() {
+    @Override
+    public void reset() {
         numMsgsReceived.reset();
         numBytesReceived.reset();
         numReceiveFailed.reset();
@@ -185,20 +195,21 @@ public class ConsumerStats implements Serializable {
         totalAcksFailed.reset();
     }
 
-    void updateCumulativeStats(ConsumerStats stats) {
+    @Override
+    public void updateCumulativeStats(ConsumerStats stats) {
         if (stats == null) {
             return;
         }
-        numMsgsReceived.add(stats.numMsgsReceived.longValue());
-        numBytesReceived.add(stats.numBytesReceived.longValue());
-        numReceiveFailed.add(stats.numReceiveFailed.longValue());
-        numAcksSent.add(stats.numAcksSent.longValue());
-        numAcksFailed.add(stats.numAcksFailed.longValue());
-        totalMsgsReceived.add(stats.totalMsgsReceived.longValue());
-        totalBytesReceived.add(stats.totalBytesReceived.longValue());
-        totalReceiveFailed.add(stats.totalReceiveFailed.longValue());
-        totalAcksSent.add(stats.totalAcksSent.longValue());
-        totalAcksFailed.add(stats.totalAcksFailed.longValue());
+        numMsgsReceived.add(stats.getNumMsgsReceived());
+        numBytesReceived.add(stats.getNumBytesReceived());
+        numReceiveFailed.add(stats.getNumReceiveFailed());
+        numAcksSent.add(stats.getNumAcksSent());
+        numAcksFailed.add(stats.getNumAcksFailed());
+        totalMsgsReceived.add(stats.getTotalMsgsReceived());
+        totalBytesReceived.add(stats.getTotalBytesReceived());
+        totalReceiveFailed.add(stats.getTotalReceivedFailed());
+        totalAcksSent.add(stats.getTotalAcksSent());
+        totalAcksFailed.add(stats.getTotalAcksFailed());
     }
 
     public long getNumMsgsReceived() {
@@ -241,5 +252,15 @@ public class ConsumerStats implements Serializable {
         return totalAcksFailed.longValue();
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ConsumerStats.class);
+    @Override
+    public double getRateMsgsReceived() {
+        return receivedMsgsRate;
+    }
+
+    @Override
+    public double getRateBytesReceived() {
+        return receivedBytesRate;
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ConsumerStatsRecorderImpl.class);
 }
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index df6894c..f46c2b8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -63,7 +63,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
 
     private final int numPartitions;
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
-    private final ConsumerStats stats;
+    private final ConsumerStatsRecorderImpl stats;
     private final UnAckedMessageTracker unAckedMessageTracker;
 
     PartitionedConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, int numPartitions,
@@ -81,7 +81,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
             this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
         }
 
-        stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null;
+        stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStatsRecorderImpl() : null;
         checkArgument(conf.getReceiverQueueSize() > 0,
                 "Receiver queue size needs to be greater than 0 for Partitioned Topics");
         start();
@@ -530,7 +530,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
     }
 
     @Override
-    public synchronized ConsumerStats getStats() {
+    public synchronized ConsumerStatsRecorderImpl getStats() {
         if (stats == null) {
             return null;
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 5281d51..23a86e6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -47,7 +47,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
 
     private List<ProducerImpl<T>> producers;
     private MessageRouter routerPolicy;
-    private final ProducerStats stats;
+    private final ProducerStatsRecorderImpl stats;
     private final TopicMetadata topicMetadata;
 
     public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions,
@@ -56,7 +56,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
         this.producers = Lists.newArrayListWithCapacity(numPartitions);
         this.topicMetadata = new TopicMetadataImpl(numPartitions);
         this.routerPolicy = getMessageRouter();
-        stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStats() : null;
+        stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStatsRecorderImpl() : null;
 
         int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
                 conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
@@ -204,7 +204,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
     }
 
     @Override
-    public synchronized ProducerStats getStats() {
+    public synchronized ProducerStatsRecorderImpl getStats() {
         if (stats == null) {
             return null;
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 15779cd..8f0586b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -93,7 +93,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
     private String connectedSince;
     private final int partitionIndex;
 
-    private final ProducerStats stats;
+    private final ProducerStatsRecorder stats;
 
     private final CompressionCodec compressor;
 
@@ -161,9 +161,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
             this.batchMessageContainer = null;
         }
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
-            stats = new ProducerStats(client, conf, this);
+            stats = new ProducerStatsRecorderImpl(client, conf, this);
         } else {
-            stats = ProducerStats.PRODUCER_STATS_DISABLED;
+            stats = ProducerStatsDisabled.INSTANCE;
         }
 
         if (conf.getProperties().isEmpty()) {
@@ -1240,10 +1240,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
     }
 
     @Override
-    public ProducerStats getStats() {
-        if (stats instanceof ProducerStatsDisabled) {
-            return null;
-        }
+    public ProducerStatsRecorder getStats() {
         return stats;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
index 3c80f62..214f46c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
@@ -18,27 +18,113 @@
  */
 package org.apache.pulsar.client.impl;
 
-public class ProducerStatsDisabled extends ProducerStats {
+public class ProducerStatsDisabled implements ProducerStatsRecorder {
     private static final long serialVersionUID = 1L;
 
+    static final ProducerStatsRecorder INSTANCE = new ProducerStatsDisabled();
+
     @Override
-    void incrementSendFailed() {
+    public void incrementSendFailed() {
         // Do nothing
     }
 
     @Override
-    void incrementSendFailed(long numMsgs) {
+    public void incrementSendFailed(long numMsgs) {
         // Do nothing
     }
 
     @Override
-    void incrementNumAcksReceived(long latencyNs) {
+    public void incrementNumAcksReceived(long latencyNs) {
         // Do nothing
     }
 
     @Override
-    void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
+    public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
         // Do nothing
     }
 
+    @Override
+    public void cancelStatsTimeout() {
+        // Do nothing
+    }
+
+    @Override
+    public long getNumMsgsSent() {
+        return 0;
+    }
+
+    @Override
+    public long getNumBytesSent() {
+        return 0;
+    }
+
+    @Override
+    public long getNumSendFailed() {
+        return 0;
+    }
+
+    @Override
+    public long getNumAcksReceived() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalMsgsSent() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalBytesSent() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalSendFailed() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalAcksReceived() {
+        return 0;
+    }
+
+    @Override
+    public double getSendMsgsRate() {
+        return 0;
+    }
+
+    @Override
+    public double getSendBytesRate() {
+        return 0;
+    }
+
+    @Override
+    public double getSendLatencyMillis50pct() {
+        return 0;
+    }
+
+    @Override
+    public double getSendLatencyMillis75pct() {
+        return 0;
+    }
+
+    @Override
+    public double getSendLatencyMillis95pct() {
+        return 0;
+    }
+
+    @Override
+    public double getSendLatencyMillis99pct() {
+        return 0;
+    }
+
+    @Override
+    public double getSendLatencyMillis999pct() {
+        return 0;
+    }
+
+    @Override
+    public double getSendLatencyMillisMax() {
+        return 0;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorder.java
similarity index 64%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorder.java
index 3c80f62..4c9bd9e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorder.java
@@ -18,27 +18,16 @@
  */
 package org.apache.pulsar.client.impl;
 
-public class ProducerStatsDisabled extends ProducerStats {
-    private static final long serialVersionUID = 1L;
+import org.apache.pulsar.client.api.ProducerStats;
 
-    @Override
-    void incrementSendFailed() {
-        // Do nothing
-    }
+public interface ProducerStatsRecorder extends ProducerStats {
+    void updateNumMsgsSent(long numMsgs, long totalMsgsSize);
 
-    @Override
-    void incrementSendFailed(long numMsgs) {
-        // Do nothing
-    }
+    void incrementSendFailed();
 
-    @Override
-    void incrementNumAcksReceived(long latencyNs) {
-        // Do nothing
-    }
+    void incrementSendFailed(long numMsgs);
 
-    @Override
-    void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
-        // Do nothing
-    }
+    void incrementNumAcksReceived(long latencyNs);
 
+    void cancelStatsTimeout();
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
similarity index 65%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index 2628003..6bea137 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -19,11 +19,11 @@
 package org.apache.pulsar.client.impl;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.text.DecimalFormat;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 
+import org.apache.pulsar.client.api.ProducerStats;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +36,7 @@ import com.yahoo.sketches.quantiles.DoublesSketch;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 
-public class ProducerStats implements Serializable {
+public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
 
     private static final long serialVersionUID = 1L;
     private TimerTask stat;
@@ -53,14 +53,17 @@ public class ProducerStats implements Serializable {
     private final LongAdder totalBytesSent;
     private final LongAdder totalSendFailed;
     private final LongAdder totalAcksReceived;
-    private final DecimalFormat dec;
-    private final DecimalFormat throughputFormat;
+    private static final DecimalFormat DEC = new DecimalFormat("0.000");
+    private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00");
     private final DoublesSketch ds;
-    private final double[] percentiles = { 0.5, 0.95, 0.99, 0.999, 0.9999 };
 
-    public static final ProducerStats PRODUCER_STATS_DISABLED = new ProducerStatsDisabled();
+    private volatile double sendMsgsRate;
+    private volatile double sendBytesRate;
+    private volatile double[] latencyPctValues;
 
-    public ProducerStats() {
+    private static final double[] PERCENTILES = { 0.5, 0.75, 0.95, 0.99, 0.999, 1.0 };
+
+    public ProducerStatsRecorderImpl() {
         numMsgsSent = null;
         numBytesSent = null;
         numSendFailed = null;
@@ -69,12 +72,11 @@ public class ProducerStats implements Serializable {
         totalBytesSent = null;
         totalSendFailed = null;
         totalAcksReceived = null;
-        dec = null;
-        throughputFormat = null;
         ds = null;
     }
 
-    public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfigurationData conf, ProducerImpl<?> producer) {
+    public ProducerStatsRecorderImpl(PulsarClientImpl pulsarClient, ProducerConfigurationData conf,
+            ProducerImpl<?> producer) {
         this.pulsarClient = pulsarClient;
         this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
         this.producer = producer;
@@ -87,8 +89,6 @@ public class ProducerStats implements Serializable {
         totalSendFailed = new LongAdder();
         totalAcksReceived = new LongAdder();
         ds = DoublesSketch.builder().build(256);
-        dec = new DecimalFormat("0.000");
-        throughputFormat = new DecimalFormat("0.00");
         init(conf);
     }
 
@@ -125,32 +125,33 @@ public class ProducerStats implements Serializable {
                 totalSendFailed.add(currentNumSendFailedMsgs);
                 totalAcksReceived.add(currentNumAcksReceived);
 
-                double[] percentileValues;
                 synchronized (ds) {
-                    percentileValues = ds.getQuantiles(percentiles);
+                    latencyPctValues = ds.getQuantiles(PERCENTILES);
                     ds.reset();
                 }
 
+                sendMsgsRate = currentNumMsgsSent / elapsed;
+                sendBytesRate = currentNumBytesSent / elapsed;
+
                 if ((currentNumMsgsSent | currentNumSendFailedMsgs | currentNumAcksReceived
                         | currentNumMsgsSent) != 0) {
 
-                    for (int i = 0; i < percentileValues.length; i++) {
-                        if (percentileValues[i] == Double.NaN) {
-                            percentileValues[i] = 0;
+                    for (int i = 0; i < latencyPctValues.length; i++) {
+                        if (latencyPctValues[i] == Double.NaN) {
+                            latencyPctValues[i] = 0;
                         }
                     }
 
-                    log.info(
-                            "[{}] [{}] Pending messages: {} --- Publish throughput: {} msg/s --- {} Mbit/s --- "
-                                    + "Latency: med: {} ms - 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - 99.99pct: {} ms --- "
-                                    + "Ack received rate: {} ack/s --- Failed messages: {}",
-                            producer.getTopic(), producer.getProducerName(), producer.getPendingQueueSize(),
-                            throughputFormat.format(currentNumMsgsSent / elapsed),
-                            throughputFormat.format(currentNumBytesSent / elapsed / 1024 / 1024 * 8),
-                            dec.format(percentileValues[0] / 1000.0), dec.format(percentileValues[1] / 1000.0),
-                            dec.format(percentileValues[2] / 1000.0), dec.format(percentileValues[3] / 1000.0),
-                            dec.format(percentileValues[4] / 1000.0),
-                            throughputFormat.format(currentNumAcksReceived / elapsed), currentNumSendFailedMsgs);
+                    log.info("[{}] [{}] Pending messages: {} --- Publish throughput: {} msg/s --- {} Mbit/s --- "
+                            + "Latency: med: {} ms - 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - max: {} ms --- "
+                            + "Ack received rate: {} ack/s --- Failed messages: {}", producer.getTopic(),
+                            producer.getProducerName(), producer.getPendingQueueSize(),
+                            THROUGHPUT_FORMAT.format(sendMsgsRate),
+                            THROUGHPUT_FORMAT.format(sendBytesRate / 1024 / 1024 * 8),
+                            DEC.format(latencyPctValues[0] / 1000.0), DEC.format(latencyPctValues[2] / 1000.0),
+                            DEC.format(latencyPctValues[3] / 1000.0), DEC.format(latencyPctValues[4] / 1000.0),
+                            DEC.format(latencyPctValues[5] / 1000.0),
+                            THROUGHPUT_FORMAT.format(currentNumAcksReceived / elapsed), currentNumSendFailedMsgs);
                 }
 
             } catch (Exception e) {
@@ -170,20 +171,24 @@ public class ProducerStats implements Serializable {
         return statTimeout;
     }
 
-    void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
+    @Override
+    public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
         numMsgsSent.add(numMsgs);
         numBytesSent.add(totalMsgsSize);
     }
 
-    void incrementSendFailed() {
+    @Override
+    public void incrementSendFailed() {
         numSendFailed.increment();
     }
 
-    void incrementSendFailed(long numMsgs) {
+    @Override
+    public void incrementSendFailed(long numMsgs) {
         numSendFailed.add(numMsgs);
     }
 
-    void incrementNumAcksReceived(long latencyNs) {
+    @Override
+    public void incrementNumAcksReceived(long latencyNs) {
         numAcksReceived.increment();
         synchronized (ds) {
             ds.update(TimeUnit.NANOSECONDS.toMicros(latencyNs));
@@ -205,28 +210,32 @@ public class ProducerStats implements Serializable {
         if (stats == null) {
             return;
         }
-        numMsgsSent.add(stats.numMsgsSent.longValue());
-        numBytesSent.add(stats.numBytesSent.longValue());
-        numSendFailed.add(stats.numSendFailed.longValue());
-        numAcksReceived.add(stats.numAcksReceived.longValue());
-        totalMsgsSent.add(stats.numMsgsSent.longValue());
-        totalBytesSent.add(stats.numBytesSent.longValue());
-        totalSendFailed.add(stats.numSendFailed.longValue());
-        totalAcksReceived.add(stats.numAcksReceived.longValue());
+        numMsgsSent.add(stats.getNumMsgsSent());
+        numBytesSent.add(stats.getNumBytesSent());
+        numSendFailed.add(stats.getNumSendFailed());
+        numAcksReceived.add(stats.getNumAcksReceived());
+        totalMsgsSent.add(stats.getNumMsgsSent());
+        totalBytesSent.add(stats.getNumBytesSent());
+        totalSendFailed.add(stats.getNumSendFailed());
+        totalAcksReceived.add(stats.getNumAcksReceived());
     }
 
+    @Override
     public long getNumMsgsSent() {
         return numMsgsSent.longValue();
     }
 
+    @Override
     public long getNumBytesSent() {
         return numBytesSent.longValue();
     }
 
+    @Override
     public long getNumSendFailed() {
         return numSendFailed.longValue();
     }
 
+    @Override
     public long getNumAcksReceived() {
         return numAcksReceived.longValue();
     }
@@ -247,6 +256,46 @@ public class ProducerStats implements Serializable {
         return totalAcksReceived.longValue();
     }
 
+    @Override
+    public double getSendMsgsRate() {
+        return sendMsgsRate;
+    }
+
+    @Override
+    public double getSendBytesRate() {
+        return sendBytesRate;
+    }
+
+    @Override
+    public double getSendLatencyMillis50pct() {
+        return latencyPctValues[0];
+    }
+
+    @Override
+    public double getSendLatencyMillis75pct() {
+        return latencyPctValues[1];
+    }
+
+    @Override
+    public double getSendLatencyMillis95pct() {
+        return latencyPctValues[2];
+    }
+
+    @Override
+    public double getSendLatencyMillis99pct() {
+        return latencyPctValues[3];
+    }
+
+    @Override
+    public double getSendLatencyMillis999pct() {
+        return latencyPctValues[4];
+    }
+
+    @Override
+    public double getSendLatencyMillisMax() {
+        return latencyPctValues[5];
+    }
+
     public void cancelStatsTimeout() {
         if (statTimeout != null) {
             statTimeout.cancel();
@@ -254,5 +303,5 @@ public class ProducerStats implements Serializable {
         }
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProducerStats.class);
+    private static final Logger log = LoggerFactory.getLogger(ProducerStatsRecorderImpl.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
index 929e6f7..e8c018c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -38,7 +39,9 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -47,14 +50,12 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
 public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
 
     // All topics should be in same namespace
@@ -78,7 +79,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
     AtomicInteger numberTopicPartitions;
 
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
-    private final ConsumerStats stats;
+    private final ConsumerStatsRecorder stats;
     private final UnAckedMessageTracker unAckedMessageTracker;
     private final ConsumerConfigurationData<T> internalConfig;
 
@@ -103,7 +104,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
         }
 
         this.internalConfig = getInternalConsumerConfig();
-        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null;
+        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStatsRecorderImpl() : null;
 
         if (conf.getTopicNames().isEmpty()) {
             this.namespaceName = null;
@@ -670,7 +671,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
                         client.externalExecutorProvider().getExecutor(), 0, subFuture, schema);
                 consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
 
-                futureList = Lists.newArrayList(subFuture);
+                futureList = Collections.singletonList(subFuture);
             }
 
             FutureUtil.waitForAll(futureList)
@@ -809,7 +810,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
     }
 
     // get partitioned consumers
-    public List<ConsumerImpl> getConsumers() {
+    public List<ConsumerImpl<T>> getConsumers() {
         return consumers.values().stream().collect(Collectors.toList());
     }
 

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.