You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/05 23:44:28 UTC

[GitHub] merlimat closed pull request #1321: Improved API for producer/consumer stats

merlimat closed pull request #1321: Improved API for producer/consumer stats
URL: https://github.com/apache/incubator-pulsar/pull/1321
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 30a2c25f2..8a536f2a3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -34,8 +34,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.impl.ConsumerStats;
-import org.apache.pulsar.client.impl.ProducerStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 10367b8ad..4f9cc7002 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -163,8 +163,7 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception {
         // 4. verify consumer get methods, to get right number of partitions and topics.
         assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern());
         List<String> topics = ((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics();
-        @SuppressWarnings("rawtypes")
-        List<ConsumerImpl> consumers = ((PatternTopicsConsumerImpl<?>) consumer).getConsumers();
+        List<ConsumerImpl<byte[]>> consumers = ((PatternTopicsConsumerImpl<byte[]>) consumer).getConsumers();
 
         assertEquals(topics.size(), 6);
         assertEquals(consumers.size(), 6);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
index a1dd628c1..876b6d21b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -22,12 +22,8 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.pulsar.client.impl.ConsumerStats;
-
 /**
  * An interface that abstracts behavior of Pulsar's consumer.
- *
- *
  */
 public interface Consumer<T> extends Closeable {
 
@@ -201,16 +197,22 @@
     CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId);
 
     /**
-     * Get statistics for the consumer
+     * Get statistics for the consumer.
      *
-     * numMsgsReceived : Number of messages received in the current interval numBytesReceived : Number of bytes received
-     * in the current interval numReceiveFailed : Number of messages failed to receive in the current interval
-     * numAcksSent : Number of acks sent in the current interval numAcksFailed : Number of acks failed to send in the
-     * current interval totalMsgsReceived : Total number of messages received totalBytesReceived : Total number of bytes
-     * received totalReceiveFailed : Total number of messages failed to receive totalAcksSent : Total number of acks
-     * sent totalAcksFailed : Total number of acks failed to sent
+     * <ul>
+     * <li>numMsgsReceived : Number of messages received in the current interval
+     * <li>numBytesReceived : Number of bytes received in the current interval
+     * <li>numReceiveFailed : Number of messages failed to receive in the current interval
+     * <li>numAcksSent : Number of acks sent in the current interval
+     * <li>numAcksFailed : Number of acks failed to send in the current interval
+     * <li>totalMsgsReceived : Total number of messages received
+     * <li>totalBytesReceived : Total number of bytes received
+     * <li>totalReceiveFailed : Total number of messages failed to receive
+     * <li>totalAcksSent : Total number of acks sent
+     * <li>totalAcksFailed : Total number of acks failed to sent
+     * </ul>
      *
-     * @return statistic for the consumer or null if ConsumerStats is disabled.
+     * @return statistic for the consumer
      */
     ConsumerStats getStats();
 
@@ -229,6 +231,9 @@
 
     /**
      * Return true if the topic was terminated and this consumer has already consumed all the messages in the topic.
+     *
+     * Please note that this does not simply mean that the consumer is caught up with the last message published by
+     * producers, rather the topic needs to be explicitly "terminated".
      */
     boolean hasReachedEndOfTopic();
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
new file mode 100644
index 000000000..5901c2d1c
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+
+/**
+ * Consumer statistics recorded by client.
+ *
+ * All the stats are relative to the last recording period. The interval of the stats refreshes is configured with
+ * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute.
+ */
+public interface ConsumerStats extends Serializable {
+
+    /**
+     * @return Number of messages received in the last interval
+     */
+    long getNumMsgsReceived();
+
+    /**
+     * @return Number of bytes received in the last interval
+     */
+    long getNumBytesReceived();
+
+    /**
+     * @return Rate of bytes per second received in the last interval
+     */
+    double getRateMsgsReceived();
+
+    /**
+     * @return Rate of bytes per second received in the last interval
+     */
+    double getRateBytesReceived();
+
+    /**
+     * @return Number of message acknowledgments sent in the last interval
+     */
+    long getNumAcksSent();
+
+    /**
+     * @return Number of message acknowledgments failed in the last interval
+     */
+    long getNumAcksFailed();
+
+    /**
+     * @return Number of message receive failed in the last interval
+     */
+    long getNumReceiveFailed();
+
+    /**
+     * @return Total number of messages received by this consumer
+     */
+    long getTotalMsgsReceived();
+
+    /**
+     * @return Total number of bytes received by this consumer
+     */
+    long getTotalBytesReceived();
+
+    /**
+     * @return Total number of messages receive failures
+     */
+    long getTotalReceivedFailed();
+
+    /**
+     * @return Total number of message acknowledgments sent by this consumer
+     */
+    long getTotalAcksSent();
+
+    /**
+     * @return Total number of message acknowledgments failures on this consumer
+     */
+    long getTotalAcksFailed();
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java
index fe60238a9..97c5c4eeb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java
@@ -21,8 +21,6 @@
 import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
 
-import org.apache.pulsar.client.impl.ProducerStats;
-
 /**
  * Producer object.
  *
@@ -124,13 +122,18 @@
     /**
      * Get statistics for the producer
      *
-     * numMsgsSent : Number of messages sent in the current interval numBytesSent : Number of bytes sent in the current
-     * interval numSendFailed : Number of messages failed to send in the current interval numAcksReceived : Number of
-     * acks received in the current interval totalMsgsSent : Total number of messages sent totalBytesSent : Total number
-     * of bytes sent totalSendFailed : Total number of messages failed to send totalAcksReceived: Total number of acks
-     * received
+     * <ul>
+     * <li>numMsgsSent : Number of messages sent in the current interval
+     * <li>numBytesSent : Number of bytes sent in the current interval
+     * <li>numSendFailed : Number of messages failed to send in the current interval
+     * <li>numAcksReceived : Number of acks received in the current interval
+     * <li>totalMsgsSent : Total number of messages sent
+     * <li>totalBytesSent : Total number of bytes sent
+     * <li>totalSendFailed : Total number of messages failed to send
+     * <li>totalAcksReceived: Total number of acks received
+     * </ul>
      *
-     * @return statistic for the producer or null if ProducerStats is disabled.
+     * @return statistic for the producer or null if ProducerStatsRecorderImpl is disabled.
      */
     ProducerStats getStats();
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
new file mode 100644
index 000000000..552c61ae6
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+
+/**
+ * Producer statistics recorded by client.
+ *
+ * All the stats are relative to the last recording period. The interval of the stats refreshes is configured with
+ * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute.
+ */
+public interface ProducerStats extends Serializable {
+    /**
+     * @return the number of messages published in the last interval
+     */
+    long getNumMsgsSent();
+
+    /**
+     * @return the number of bytes sent in the last interval
+     */
+    long getNumBytesSent();
+
+    /**
+     * @return the number of failed send operations in the last interval
+     */
+    long getNumSendFailed();
+
+    /**
+     * @return the number of send acknowledges received by broker in the last interval
+     */
+    long getNumAcksReceived();
+
+    /**
+     * @return the messages send rate in the last interval
+     */
+    double getSendMsgsRate();
+
+    /**
+     * @return the bytes send rate in the last interval
+     */
+    double getSendBytesRate();
+
+    /**
+     * @return the 50th percentile of the send latency in milliseconds for the last interval
+     */
+    double getSendLatencyMillis50pct();
+
+    /**
+     * @return the 75th percentile of the send latency in milliseconds for the last interval
+     */
+    double getSendLatencyMillis75pct();
+
+    /**
+     * @return the 95th percentile of the send latency in milliseconds for the last interval
+     */
+    double getSendLatencyMillis95pct();
+
+    /**
+     * @return the 99th percentile of the send latency in milliseconds for the last interval
+     */
+    double getSendLatencyMillis99pct();
+
+    /**
+     * @return the 99.9th percentile of the send latency in milliseconds for the last interval
+     */
+    double getSendLatencyMillis999pct();
+
+    /**
+     * @return the max send latency in milliseconds for the last interval
+     */
+    double getSendLatencyMillisMax();
+
+    /**
+     * @return the total number of messages published by this producer
+     */
+    long getTotalMsgsSent();
+
+    /**
+     * @return the total number of bytes sent by this producer
+     */
+    long getTotalBytesSent();
+
+    /**
+     * @return the total number of failed send operations
+     */
+    long getTotalSendFailed();
+
+    /**
+     * @return the total number of send acknowledges received by broker
+     */
+    long getTotalAcksReceived();
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 183651a1e..ab812a0b3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,11 +25,6 @@
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
-import com.google.common.collect.Iterables;
-import io.netty.buffer.ByteBuf;
-import io.netty.util.Timeout;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -52,8 +47,10 @@
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
+
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -75,6 +72,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Iterables;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Timeout;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
 public class ConsumerImpl<T> extends ConsumerBase<T> {
     private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
 
@@ -106,7 +110,7 @@
     private final UnAckedMessageTracker unAckedMessageTracker;
     private final ConcurrentNavigableMap<MessageIdImpl, BitSet> batchMessageAckTracker;
 
-    protected final ConsumerStats stats;
+    protected final ConsumerStatsRecorder stats;
     private final int priorityLevel;
     private final SubscriptionMode subscriptionMode;
     private BatchMessageIdImpl startMessageId;
@@ -150,9 +154,9 @@
         this.readCompacted = conf.isReadCompacted();
 
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
-            stats = new ConsumerStats(client, conf, this);
+            stats = new ConsumerStatsRecorderImpl(client, conf, this);
         } else {
-            stats = ConsumerStats.CONSUMER_STATS_DISABLED;
+            stats = ConsumerStatsDisabled.INSTANCE;
         }
 
         if (conf.getReceiverQueueSize() <= 1) {
@@ -698,10 +702,7 @@ void connectionFailed(PulsarClientException exception) {
             return CompletableFuture.completedFuture(null);
         }
 
-        Timeout timeout = stats.getStatTimeout();
-        if (timeout != null) {
-            timeout.cancel();
-        }
+        stats.getStatTimeout().ifPresent(Timeout::cancel);
 
         setState(State.Closing);
 
@@ -1408,9 +1409,6 @@ private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
 
     @Override
     public ConsumerStats getStats() {
-        if (stats instanceof ConsumerStatsDisabled) {
-            return null;
-        }
         return stats;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
index 08b189cc1..717acfaec 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
@@ -18,29 +18,110 @@
  */
 package org.apache.pulsar.client.impl;
 
+import java.util.Optional;
+
+import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
 
-public class ConsumerStatsDisabled extends ConsumerStats {
+import io.netty.util.Timeout;
+
+public class ConsumerStatsDisabled implements ConsumerStatsRecorder {
     private static final long serialVersionUID = 1L;
 
+    static final ConsumerStatsRecorder INSTANCE = new ConsumerStatsDisabled();
+
     @Override
-    void updateNumMsgsReceived(Message<?> message) {
+    public void updateNumMsgsReceived(Message<?> message) {
         // Do nothing
     }
 
     @Override
-    void incrementNumReceiveFailed() {
+    public void incrementNumReceiveFailed() {
         // Do nothing
     }
 
     @Override
-    void incrementNumAcksSent(long numAcks) {
+    public void incrementNumAcksSent(long numAcks) {
         // Do nothing
     }
 
     @Override
-    void incrementNumAcksFailed() {
+    public void incrementNumAcksFailed() {
         // Do nothing
     }
 
+    @Override
+    public long getNumMsgsReceived() {
+        return 0;
+    }
+
+    @Override
+    public long getNumBytesReceived() {
+        return 0;
+    }
+
+    @Override
+    public long getNumAcksSent() {
+        return 0;
+    }
+
+    @Override
+    public long getNumAcksFailed() {
+        return 0;
+    }
+
+    @Override
+    public long getNumReceiveFailed() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalMsgsReceived() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalBytesReceived() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalReceivedFailed() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalAcksSent() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalAcksFailed() {
+        return 0;
+    }
+
+    @Override
+    public double getRateMsgsReceived() {
+        return 0;
+    }
+
+    @Override
+    public double getRateBytesReceived() {
+        return 0;
+    }
+
+    @Override
+    public Optional<Timeout> getStatTimeout() {
+        return Optional.empty();
+    }
+
+    @Override
+    public void reset() {
+        // do nothing
+    }
+
+    @Override
+    public void updateCumulativeStats(ConsumerStats stats) {
+        // do nothing
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
new file mode 100644
index 000000000..f29ca0949
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Optional;
+
+import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.Message;
+
+import io.netty.util.Timeout;
+
+public interface ConsumerStatsRecorder extends ConsumerStats {
+    void updateNumMsgsReceived(Message<?> message);
+
+    void incrementNumAcksSent(long numAcks);
+
+    void incrementNumAcksFailed();
+
+    void incrementNumReceiveFailed();
+
+    Optional<Timeout> getStatTimeout();
+
+    void reset();
+
+    void updateCumulativeStats(ConsumerStats stats);
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
similarity index 76%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
index 21ff3c436..01c51cc10 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
@@ -19,11 +19,12 @@
 package org.apache.pulsar.client.impl;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.text.DecimalFormat;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 
+import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.slf4j.Logger;
@@ -36,7 +37,7 @@
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 
-public class ConsumerStats implements Serializable {
+public class ConsumerStatsRecorderImpl implements ConsumerStatsRecorder {
 
     private static final long serialVersionUID = 1L;
     private TimerTask stat;
@@ -56,11 +57,12 @@
     private final LongAdder totalAcksSent;
     private final LongAdder totalAcksFailed;
 
-    private final DecimalFormat throughputFormat;
+    private volatile double receivedMsgsRate;
+    private volatile double receivedBytesRate;
 
-    public static final ConsumerStats CONSUMER_STATS_DISABLED = new ConsumerStatsDisabled();
+    private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00");
 
-    public ConsumerStats() {
+    public ConsumerStatsRecorderImpl() {
         numMsgsReceived = null;
         numBytesReceived = null;
         numReceiveFailed = null;
@@ -71,10 +73,10 @@ public ConsumerStats() {
         totalReceiveFailed = null;
         totalAcksSent = null;
         totalAcksFailed = null;
-        throughputFormat = null;
     }
 
-    public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfigurationData<?> conf, ConsumerImpl<?> consumer) {
+    public ConsumerStatsRecorderImpl(PulsarClientImpl pulsarClient, ConsumerConfigurationData<?> conf,
+            ConsumerImpl<?> consumer) {
         this.pulsarClient = pulsarClient;
         this.consumer = consumer;
         this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
@@ -88,7 +90,6 @@ public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfigurationData<?>
         totalReceiveFailed = new LongAdder();
         totalAcksSent = new LongAdder();
         totalAcksFailed = new LongAdder();
-        throughputFormat = new DecimalFormat("0.00");
         init(conf);
     }
 
@@ -124,6 +125,9 @@ private void init(ConsumerConfigurationData<?> conf) {
                 totalAcksSent.add(currentNumAcksSent);
                 totalAcksFailed.add(currentNumAcksFailed);
 
+                receivedMsgsRate = currentNumMsgsReceived / elapsed;
+                receivedBytesRate = currentNumBytesReceived / elapsed;
+
                 if ((currentNumMsgsReceived | currentNumBytesReceived | currentNumReceiveFailed | currentNumAcksSent
                         | currentNumAcksFailed) != 0) {
                     log.info(
@@ -131,9 +135,9 @@ private void init(ConsumerConfigurationData<?> conf) {
                                     + "Throughput received: {} msg/s --- {} Mbit/s --- "
                                     + "Ack sent rate: {} ack/s --- " + "Failed messages: {} --- " + "Failed acks: {}",
                             consumer.getTopic(), consumer.getSubscription(), consumer.consumerName,
-                            consumer.incomingMessages.size(), throughputFormat.format(currentNumMsgsReceived / elapsed),
-                            throughputFormat.format(currentNumBytesReceived / elapsed * 8 / 1024 / 1024),
-                            throughputFormat.format(currentNumAcksSent / elapsed), currentNumReceiveFailed,
+                            consumer.incomingMessages.size(), THROUGHPUT_FORMAT.format(receivedMsgsRate),
+                            THROUGHPUT_FORMAT.format(receivedBytesRate * 8 / 1024 / 1024),
+                            THROUGHPUT_FORMAT.format(currentNumAcksSent / elapsed), currentNumReceiveFailed,
                             currentNumAcksFailed);
                 }
             } catch (Exception e) {
@@ -149,30 +153,36 @@ private void init(ConsumerConfigurationData<?> conf) {
         statTimeout = pulsarClient.timer().newTimeout(stat, statsIntervalSeconds, TimeUnit.SECONDS);
     }
 
-    void updateNumMsgsReceived(Message<?> message) {
+    @Override
+    public void updateNumMsgsReceived(Message<?> message) {
         if (message != null) {
             numMsgsReceived.increment();
             numBytesReceived.add(message.getData().length);
         }
     }
 
-    void incrementNumAcksSent(long numAcks) {
+    @Override
+    public void incrementNumAcksSent(long numAcks) {
         numAcksSent.add(numAcks);
     }
 
-    void incrementNumAcksFailed() {
+    @Override
+    public void incrementNumAcksFailed() {
         numAcksFailed.increment();
     }
 
-    void incrementNumReceiveFailed() {
+    @Override
+    public void incrementNumReceiveFailed() {
         numReceiveFailed.increment();
     }
 
-    Timeout getStatTimeout() {
-        return statTimeout;
+    @Override
+    public Optional<Timeout> getStatTimeout() {
+        return Optional.ofNullable(statTimeout);
     }
 
-    void reset() {
+    @Override
+    public void reset() {
         numMsgsReceived.reset();
         numBytesReceived.reset();
         numReceiveFailed.reset();
@@ -185,20 +195,21 @@ void reset() {
         totalAcksFailed.reset();
     }
 
-    void updateCumulativeStats(ConsumerStats stats) {
+    @Override
+    public void updateCumulativeStats(ConsumerStats stats) {
         if (stats == null) {
             return;
         }
-        numMsgsReceived.add(stats.numMsgsReceived.longValue());
-        numBytesReceived.add(stats.numBytesReceived.longValue());
-        numReceiveFailed.add(stats.numReceiveFailed.longValue());
-        numAcksSent.add(stats.numAcksSent.longValue());
-        numAcksFailed.add(stats.numAcksFailed.longValue());
-        totalMsgsReceived.add(stats.totalMsgsReceived.longValue());
-        totalBytesReceived.add(stats.totalBytesReceived.longValue());
-        totalReceiveFailed.add(stats.totalReceiveFailed.longValue());
-        totalAcksSent.add(stats.totalAcksSent.longValue());
-        totalAcksFailed.add(stats.totalAcksFailed.longValue());
+        numMsgsReceived.add(stats.getNumMsgsReceived());
+        numBytesReceived.add(stats.getNumBytesReceived());
+        numReceiveFailed.add(stats.getNumReceiveFailed());
+        numAcksSent.add(stats.getNumAcksSent());
+        numAcksFailed.add(stats.getNumAcksFailed());
+        totalMsgsReceived.add(stats.getTotalMsgsReceived());
+        totalBytesReceived.add(stats.getTotalBytesReceived());
+        totalReceiveFailed.add(stats.getTotalReceivedFailed());
+        totalAcksSent.add(stats.getTotalAcksSent());
+        totalAcksFailed.add(stats.getTotalAcksFailed());
     }
 
     public long getNumMsgsReceived() {
@@ -241,5 +252,15 @@ public long getTotalAcksFailed() {
         return totalAcksFailed.longValue();
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ConsumerStats.class);
+    @Override
+    public double getRateMsgsReceived() {
+        return receivedMsgsRate;
+    }
+
+    @Override
+    public double getRateBytesReceived() {
+        return receivedBytesRate;
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ConsumerStatsRecorderImpl.class);
 }
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index df6894c47..f46c2b8b4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -63,7 +63,7 @@
 
     private final int numPartitions;
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
-    private final ConsumerStats stats;
+    private final ConsumerStatsRecorderImpl stats;
     private final UnAckedMessageTracker unAckedMessageTracker;
 
     PartitionedConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, int numPartitions,
@@ -81,7 +81,7 @@
             this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
         }
 
-        stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null;
+        stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStatsRecorderImpl() : null;
         checkArgument(conf.getReceiverQueueSize() > 0,
                 "Receiver queue size needs to be greater than 0 for Partitioned Topics");
         start();
@@ -530,7 +530,7 @@ public int numMessagesInQueue() {
     }
 
     @Override
-    public synchronized ConsumerStats getStats() {
+    public synchronized ConsumerStatsRecorderImpl getStats() {
         if (stats == null) {
             return null;
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 5281d5104..23a86e612 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -47,7 +47,7 @@
 
     private List<ProducerImpl<T>> producers;
     private MessageRouter routerPolicy;
-    private final ProducerStats stats;
+    private final ProducerStatsRecorderImpl stats;
     private final TopicMetadata topicMetadata;
 
     public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions,
@@ -56,7 +56,7 @@ public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerCo
         this.producers = Lists.newArrayListWithCapacity(numPartitions);
         this.topicMetadata = new TopicMetadataImpl(numPartitions);
         this.routerPolicy = getMessageRouter();
-        stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStats() : null;
+        stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStatsRecorderImpl() : null;
 
         int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
                 conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
@@ -204,7 +204,7 @@ public boolean isConnected() {
     }
 
     @Override
-    public synchronized ProducerStats getStats() {
+    public synchronized ProducerStatsRecorderImpl getStats() {
         if (stats == null) {
             return null;
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 15779cd92..8f0586bbd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -93,7 +93,7 @@
     private String connectedSince;
     private final int partitionIndex;
 
-    private final ProducerStats stats;
+    private final ProducerStatsRecorder stats;
 
     private final CompressionCodec compressor;
 
@@ -161,9 +161,9 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
             this.batchMessageContainer = null;
         }
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
-            stats = new ProducerStats(client, conf, this);
+            stats = new ProducerStatsRecorderImpl(client, conf, this);
         } else {
-            stats = ProducerStats.PRODUCER_STATS_DISABLED;
+            stats = ProducerStatsDisabled.INSTANCE;
         }
 
         if (conf.getProperties().isEmpty()) {
@@ -1240,10 +1240,7 @@ public int getPendingQueueSize() {
     }
 
     @Override
-    public ProducerStats getStats() {
-        if (stats instanceof ProducerStatsDisabled) {
-            return null;
-        }
+    public ProducerStatsRecorder getStats() {
         return stats;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
index 3c80f6221..214f46cb8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
@@ -18,27 +18,113 @@
  */
 package org.apache.pulsar.client.impl;
 
-public class ProducerStatsDisabled extends ProducerStats {
+public class ProducerStatsDisabled implements ProducerStatsRecorder {
     private static final long serialVersionUID = 1L;
 
+    static final ProducerStatsRecorder INSTANCE = new ProducerStatsDisabled();
+
     @Override
-    void incrementSendFailed() {
+    public void incrementSendFailed() {
         // Do nothing
     }
 
     @Override
-    void incrementSendFailed(long numMsgs) {
+    public void incrementSendFailed(long numMsgs) {
         // Do nothing
     }
 
     @Override
-    void incrementNumAcksReceived(long latencyNs) {
+    public void incrementNumAcksReceived(long latencyNs) {
         // Do nothing
     }
 
     @Override
-    void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
+    public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
         // Do nothing
     }
 
+    @Override
+    public void cancelStatsTimeout() {
+        // Do nothing
+    }
+
+    @Override
+    public long getNumMsgsSent() {
+        return 0;
+    }
+
+    @Override
+    public long getNumBytesSent() {
+        return 0;
+    }
+
+    @Override
+    public long getNumSendFailed() {
+        return 0;
+    }
+
+    @Override
+    public long getNumAcksReceived() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalMsgsSent() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalBytesSent() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalSendFailed() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalAcksReceived() {
+        return 0;
+    }
+
+    @Override
+    public double getSendMsgsRate() {
+        return 0;
+    }
+
+    @Override
+    public double getSendBytesRate() {
+        return 0;
+    }
+
+    @Override
+    public double getSendLatencyMillis50pct() {
+        return 0;
+    }
+
+    @Override
+    public double getSendLatencyMillis75pct() {
+        return 0;
+    }
+
+    @Override
+    public double getSendLatencyMillis95pct() {
+        return 0;
+    }
+
+    @Override
+    public double getSendLatencyMillis99pct() {
+        return 0;
+    }
+
+    @Override
+    public double getSendLatencyMillis999pct() {
+        return 0;
+    }
+
+    @Override
+    public double getSendLatencyMillisMax() {
+        return 0;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorder.java
new file mode 100644
index 000000000..4c9bd9ec4
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorder.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.ProducerStats;
+
+public interface ProducerStatsRecorder extends ProducerStats {
+    void updateNumMsgsSent(long numMsgs, long totalMsgsSize);
+
+    void incrementSendFailed();
+
+    void incrementSendFailed(long numMsgs);
+
+    void incrementNumAcksReceived(long latencyNs);
+
+    void cancelStatsTimeout();
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
similarity index 65%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index 262800336..6bea137f1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -19,11 +19,11 @@
 package org.apache.pulsar.client.impl;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.text.DecimalFormat;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 
+import org.apache.pulsar.client.api.ProducerStats;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +36,7 @@
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 
-public class ProducerStats implements Serializable {
+public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
 
     private static final long serialVersionUID = 1L;
     private TimerTask stat;
@@ -53,14 +53,17 @@
     private final LongAdder totalBytesSent;
     private final LongAdder totalSendFailed;
     private final LongAdder totalAcksReceived;
-    private final DecimalFormat dec;
-    private final DecimalFormat throughputFormat;
+    private static final DecimalFormat DEC = new DecimalFormat("0.000");
+    private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00");
     private final DoublesSketch ds;
-    private final double[] percentiles = { 0.5, 0.95, 0.99, 0.999, 0.9999 };
 
-    public static final ProducerStats PRODUCER_STATS_DISABLED = new ProducerStatsDisabled();
+    private volatile double sendMsgsRate;
+    private volatile double sendBytesRate;
+    private volatile double[] latencyPctValues;
 
-    public ProducerStats() {
+    private static final double[] PERCENTILES = { 0.5, 0.75, 0.95, 0.99, 0.999, 1.0 };
+
+    public ProducerStatsRecorderImpl() {
         numMsgsSent = null;
         numBytesSent = null;
         numSendFailed = null;
@@ -69,12 +72,11 @@ public ProducerStats() {
         totalBytesSent = null;
         totalSendFailed = null;
         totalAcksReceived = null;
-        dec = null;
-        throughputFormat = null;
         ds = null;
     }
 
-    public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfigurationData conf, ProducerImpl<?> producer) {
+    public ProducerStatsRecorderImpl(PulsarClientImpl pulsarClient, ProducerConfigurationData conf,
+            ProducerImpl<?> producer) {
         this.pulsarClient = pulsarClient;
         this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
         this.producer = producer;
@@ -87,8 +89,6 @@ public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfigurationData co
         totalSendFailed = new LongAdder();
         totalAcksReceived = new LongAdder();
         ds = DoublesSketch.builder().build(256);
-        dec = new DecimalFormat("0.000");
-        throughputFormat = new DecimalFormat("0.00");
         init(conf);
     }
 
@@ -125,32 +125,33 @@ private void init(ProducerConfigurationData conf) {
                 totalSendFailed.add(currentNumSendFailedMsgs);
                 totalAcksReceived.add(currentNumAcksReceived);
 
-                double[] percentileValues;
                 synchronized (ds) {
-                    percentileValues = ds.getQuantiles(percentiles);
+                    latencyPctValues = ds.getQuantiles(PERCENTILES);
                     ds.reset();
                 }
 
+                sendMsgsRate = currentNumMsgsSent / elapsed;
+                sendBytesRate = currentNumBytesSent / elapsed;
+
                 if ((currentNumMsgsSent | currentNumSendFailedMsgs | currentNumAcksReceived
                         | currentNumMsgsSent) != 0) {
 
-                    for (int i = 0; i < percentileValues.length; i++) {
-                        if (percentileValues[i] == Double.NaN) {
-                            percentileValues[i] = 0;
+                    for (int i = 0; i < latencyPctValues.length; i++) {
+                        if (latencyPctValues[i] == Double.NaN) {
+                            latencyPctValues[i] = 0;
                         }
                     }
 
-                    log.info(
-                            "[{}] [{}] Pending messages: {} --- Publish throughput: {} msg/s --- {} Mbit/s --- "
-                                    + "Latency: med: {} ms - 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - 99.99pct: {} ms --- "
-                                    + "Ack received rate: {} ack/s --- Failed messages: {}",
-                            producer.getTopic(), producer.getProducerName(), producer.getPendingQueueSize(),
-                            throughputFormat.format(currentNumMsgsSent / elapsed),
-                            throughputFormat.format(currentNumBytesSent / elapsed / 1024 / 1024 * 8),
-                            dec.format(percentileValues[0] / 1000.0), dec.format(percentileValues[1] / 1000.0),
-                            dec.format(percentileValues[2] / 1000.0), dec.format(percentileValues[3] / 1000.0),
-                            dec.format(percentileValues[4] / 1000.0),
-                            throughputFormat.format(currentNumAcksReceived / elapsed), currentNumSendFailedMsgs);
+                    log.info("[{}] [{}] Pending messages: {} --- Publish throughput: {} msg/s --- {} Mbit/s --- "
+                            + "Latency: med: {} ms - 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - max: {} ms --- "
+                            + "Ack received rate: {} ack/s --- Failed messages: {}", producer.getTopic(),
+                            producer.getProducerName(), producer.getPendingQueueSize(),
+                            THROUGHPUT_FORMAT.format(sendMsgsRate),
+                            THROUGHPUT_FORMAT.format(sendBytesRate / 1024 / 1024 * 8),
+                            DEC.format(latencyPctValues[0] / 1000.0), DEC.format(latencyPctValues[2] / 1000.0),
+                            DEC.format(latencyPctValues[3] / 1000.0), DEC.format(latencyPctValues[4] / 1000.0),
+                            DEC.format(latencyPctValues[5] / 1000.0),
+                            THROUGHPUT_FORMAT.format(currentNumAcksReceived / elapsed), currentNumSendFailedMsgs);
                 }
 
             } catch (Exception e) {
@@ -170,20 +171,24 @@ Timeout getStatTimeout() {
         return statTimeout;
     }
 
-    void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
+    @Override
+    public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
         numMsgsSent.add(numMsgs);
         numBytesSent.add(totalMsgsSize);
     }
 
-    void incrementSendFailed() {
+    @Override
+    public void incrementSendFailed() {
         numSendFailed.increment();
     }
 
-    void incrementSendFailed(long numMsgs) {
+    @Override
+    public void incrementSendFailed(long numMsgs) {
         numSendFailed.add(numMsgs);
     }
 
-    void incrementNumAcksReceived(long latencyNs) {
+    @Override
+    public void incrementNumAcksReceived(long latencyNs) {
         numAcksReceived.increment();
         synchronized (ds) {
             ds.update(TimeUnit.NANOSECONDS.toMicros(latencyNs));
@@ -205,28 +210,32 @@ void updateCumulativeStats(ProducerStats stats) {
         if (stats == null) {
             return;
         }
-        numMsgsSent.add(stats.numMsgsSent.longValue());
-        numBytesSent.add(stats.numBytesSent.longValue());
-        numSendFailed.add(stats.numSendFailed.longValue());
-        numAcksReceived.add(stats.numAcksReceived.longValue());
-        totalMsgsSent.add(stats.numMsgsSent.longValue());
-        totalBytesSent.add(stats.numBytesSent.longValue());
-        totalSendFailed.add(stats.numSendFailed.longValue());
-        totalAcksReceived.add(stats.numAcksReceived.longValue());
+        numMsgsSent.add(stats.getNumMsgsSent());
+        numBytesSent.add(stats.getNumBytesSent());
+        numSendFailed.add(stats.getNumSendFailed());
+        numAcksReceived.add(stats.getNumAcksReceived());
+        totalMsgsSent.add(stats.getNumMsgsSent());
+        totalBytesSent.add(stats.getNumBytesSent());
+        totalSendFailed.add(stats.getNumSendFailed());
+        totalAcksReceived.add(stats.getNumAcksReceived());
     }
 
+    @Override
     public long getNumMsgsSent() {
         return numMsgsSent.longValue();
     }
 
+    @Override
     public long getNumBytesSent() {
         return numBytesSent.longValue();
     }
 
+    @Override
     public long getNumSendFailed() {
         return numSendFailed.longValue();
     }
 
+    @Override
     public long getNumAcksReceived() {
         return numAcksReceived.longValue();
     }
@@ -247,6 +256,46 @@ public long getTotalAcksReceived() {
         return totalAcksReceived.longValue();
     }
 
+    @Override
+    public double getSendMsgsRate() {
+        return sendMsgsRate;
+    }
+
+    @Override
+    public double getSendBytesRate() {
+        return sendBytesRate;
+    }
+
+    @Override
+    public double getSendLatencyMillis50pct() {
+        return latencyPctValues[0];
+    }
+
+    @Override
+    public double getSendLatencyMillis75pct() {
+        return latencyPctValues[1];
+    }
+
+    @Override
+    public double getSendLatencyMillis95pct() {
+        return latencyPctValues[2];
+    }
+
+    @Override
+    public double getSendLatencyMillis99pct() {
+        return latencyPctValues[3];
+    }
+
+    @Override
+    public double getSendLatencyMillis999pct() {
+        return latencyPctValues[4];
+    }
+
+    @Override
+    public double getSendLatencyMillisMax() {
+        return latencyPctValues[5];
+    }
+
     public void cancelStatsTimeout() {
         if (statTimeout != null) {
             statTimeout.cancel();
@@ -254,5 +303,5 @@ public void cancelStatsTimeout() {
         }
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProducerStats.class);
+    private static final Logger log = LoggerFactory.getLogger(ProducerStatsRecorderImpl.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
index 929e6f7a9..e8c018cdf 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -22,6 +22,7 @@
 import static com.google.common.base.Preconditions.checkState;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -38,7 +39,9 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -47,14 +50,12 @@
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
 public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
 
     // All topics should be in same namespace
@@ -78,7 +79,7 @@
     AtomicInteger numberTopicPartitions;
 
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
-    private final ConsumerStats stats;
+    private final ConsumerStatsRecorder stats;
     private final UnAckedMessageTracker unAckedMessageTracker;
     private final ConsumerConfigurationData<T> internalConfig;
 
@@ -103,7 +104,7 @@
         }
 
         this.internalConfig = getInternalConsumerConfig();
-        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null;
+        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStatsRecorderImpl() : null;
 
         if (conf.getTopicNames().isEmpty()) {
             this.namespaceName = null;
@@ -670,7 +671,7 @@ private boolean topicNameValid(String topicName) {
                         client.externalExecutorProvider().getExecutor(), 0, subFuture, schema);
                 consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
 
-                futureList = Lists.newArrayList(subFuture);
+                futureList = Collections.singletonList(subFuture);
             }
 
             FutureUtil.waitForAll(futureList)
@@ -809,7 +810,7 @@ private void handleSubscribeOneTopicError(String topicName, Throwable error) {
     }
 
     // get partitioned consumers
-    public List<ConsumerImpl> getConsumers() {
+    public List<ConsumerImpl<T>> getConsumers() {
         return consumers.values().stream().collect(Collectors.toList());
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services