You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/05 23:44:28 UTC
[incubator-pulsar] branch master updated: Improved API for
producer/consumer stats (#1321)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 97c2b44 Improved API for producer/consumer stats (#1321)
97c2b44 is described below
commit 97c2b44a061221784c453fc6ec31fc1ec1718878
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Mar 5 15:44:27 2018 -0800
Improved API for producer/consumer stats (#1321)
* Improved API for producer/consumer stats
* Fixed compilation after merge
---
.../client/api/SimpleProducerConsumerStatTest.java | 2 -
.../client/impl/PatternTopicsConsumerImplTest.java | 3 +-
.../org/apache/pulsar/client/api/Consumer.java | 29 +++--
.../apache/pulsar/client/api/ConsumerStats.java | 90 ++++++++++++++
.../org/apache/pulsar/client/api/Producer.java | 19 +--
.../apache/pulsar/client/api/ProducerStats.java | 110 +++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 28 ++---
.../pulsar/client/impl/ConsumerStatsDisabled.java | 91 +++++++++++++-
...atsDisabled.java => ConsumerStatsRecorder.java} | 32 +++--
...erStats.java => ConsumerStatsRecorderImpl.java} | 81 ++++++++-----
.../client/impl/PartitionedConsumerImpl.java | 6 +-
.../client/impl/PartitionedProducerImpl.java | 6 +-
.../apache/pulsar/client/impl/ProducerImpl.java | 11 +-
.../pulsar/client/impl/ProducerStatsDisabled.java | 96 ++++++++++++++-
...atsDisabled.java => ProducerStatsRecorder.java} | 25 ++--
...erStats.java => ProducerStatsRecorderImpl.java} | 131 ++++++++++++++-------
.../pulsar/client/impl/TopicsConsumerImpl.java | 15 +--
17 files changed, 599 insertions(+), 176 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 30a2c25..8a536f2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -34,8 +34,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.impl.ConsumerStats;
-import org.apache.pulsar.client.impl.ProducerStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 10367b8..4f9cc70 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -163,8 +163,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics();
- @SuppressWarnings("rawtypes")
- List<ConsumerImpl> consumers = ((PatternTopicsConsumerImpl<?>) consumer).getConsumers();
+ List<ConsumerImpl<byte[]>> consumers = ((PatternTopicsConsumerImpl<byte[]>) consumer).getConsumers();
assertEquals(topics.size(), 6);
assertEquals(consumers.size(), 6);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
index a1dd628..876b6d2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -22,12 +22,8 @@ import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.client.impl.ConsumerStats;
-
/**
* An interface that abstracts behavior of Pulsar's consumer.
- *
- *
*/
public interface Consumer<T> extends Closeable {
@@ -201,16 +197,22 @@ public interface Consumer<T> extends Closeable {
CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId);
/**
- * Get statistics for the consumer
+ * Get statistics for the consumer.
*
- * numMsgsReceived : Number of messages received in the current interval numBytesReceived : Number of bytes received
- * in the current interval numReceiveFailed : Number of messages failed to receive in the current interval
- * numAcksSent : Number of acks sent in the current interval numAcksFailed : Number of acks failed to send in the
- * current interval totalMsgsReceived : Total number of messages received totalBytesReceived : Total number of bytes
- * received totalReceiveFailed : Total number of messages failed to receive totalAcksSent : Total number of acks
- * sent totalAcksFailed : Total number of acks failed to sent
+ * <ul>
+ * <li>numMsgsReceived : Number of messages received in the current interval
+ * <li>numBytesReceived : Number of bytes received in the current interval
+ * <li>numReceiveFailed : Number of messages failed to receive in the current interval
+ * <li>numAcksSent : Number of acks sent in the current interval
+ * <li>numAcksFailed : Number of acks failed to send in the current interval
+ * <li>totalMsgsReceived : Total number of messages received
+ * <li>totalBytesReceived : Total number of bytes received
+ * <li>totalReceiveFailed : Total number of messages failed to receive
+ * <li>totalAcksSent : Total number of acks sent
+ * <li>totalAcksFailed : Total number of acks failed to sent
+ * </ul>
*
- * @return statistic for the consumer or null if ConsumerStats is disabled.
+ * @return statistic for the consumer
*/
ConsumerStats getStats();
@@ -229,6 +231,9 @@ public interface Consumer<T> extends Closeable {
/**
* Return true if the topic was terminated and this consumer has already consumed all the messages in the topic.
+ *
+ * Please note that this does not simply mean that the consumer is caught up with the last message published by
+ * producers, rather the topic needs to be explicitly "terminated".
*/
boolean hasReachedEndOfTopic();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
new file mode 100644
index 0000000..5901c2d
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+
+/**
+ * Consumer statistics recorded by client.
+ *
+ * All the stats are relative to the last recording period. The interval of the stats refreshes is configured with
+ * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute.
+ */
+public interface ConsumerStats extends Serializable {
+
+ /**
+ * @return Number of messages received in the last interval
+ */
+ long getNumMsgsReceived();
+
+ /**
+ * @return Number of bytes received in the last interval
+ */
+ long getNumBytesReceived();
+
+ /**
+ * @return Rate of bytes per second received in the last interval
+ */
+ double getRateMsgsReceived();
+
+ /**
+ * @return Rate of bytes per second received in the last interval
+ */
+ double getRateBytesReceived();
+
+ /**
+ * @return Number of message acknowledgments sent in the last interval
+ */
+ long getNumAcksSent();
+
+ /**
+ * @return Number of message acknowledgments failed in the last interval
+ */
+ long getNumAcksFailed();
+
+ /**
+ * @return Number of message receive failed in the last interval
+ */
+ long getNumReceiveFailed();
+
+ /**
+ * @return Total number of messages received by this consumer
+ */
+ long getTotalMsgsReceived();
+
+ /**
+ * @return Total number of bytes received by this consumer
+ */
+ long getTotalBytesReceived();
+
+ /**
+ * @return Total number of messages receive failures
+ */
+ long getTotalReceivedFailed();
+
+ /**
+ * @return Total number of message acknowledgments sent by this consumer
+ */
+ long getTotalAcksSent();
+
+ /**
+ * @return Total number of message acknowledgments failures on this consumer
+ */
+ long getTotalAcksFailed();
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java
index fe60238..97c5c4e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java
@@ -21,8 +21,6 @@ package org.apache.pulsar.client.api;
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.client.impl.ProducerStats;
-
/**
* Producer object.
*
@@ -124,13 +122,18 @@ public interface Producer<T> extends Closeable {
/**
* Get statistics for the producer
*
- * numMsgsSent : Number of messages sent in the current interval numBytesSent : Number of bytes sent in the current
- * interval numSendFailed : Number of messages failed to send in the current interval numAcksReceived : Number of
- * acks received in the current interval totalMsgsSent : Total number of messages sent totalBytesSent : Total number
- * of bytes sent totalSendFailed : Total number of messages failed to send totalAcksReceived: Total number of acks
- * received
+ * <ul>
+ * <li>numMsgsSent : Number of messages sent in the current interval
+ * <li>numBytesSent : Number of bytes sent in the current interval
+ * <li>numSendFailed : Number of messages failed to send in the current interval
+ * <li>numAcksReceived : Number of acks received in the current interval
+ * <li>totalMsgsSent : Total number of messages sent
+ * <li>totalBytesSent : Total number of bytes sent
+ * <li>totalSendFailed : Total number of messages failed to send
+ * <li>totalAcksReceived: Total number of acks received
+ * </ul>
*
- * @return statistic for the producer or null if ProducerStats is disabled.
+ * @return statistic for the producer or null if ProducerStatsRecorderImpl is disabled.
*/
ProducerStats getStats();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
new file mode 100644
index 0000000..552c61a
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+
+/**
+ * Producer statistics recorded by client.
+ *
+ * All the stats are relative to the last recording period. The interval of the stats refreshes is configured with
+ * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute.
+ */
+public interface ProducerStats extends Serializable {
+ /**
+ * @return the number of messages published in the last interval
+ */
+ long getNumMsgsSent();
+
+ /**
+ * @return the number of bytes sent in the last interval
+ */
+ long getNumBytesSent();
+
+ /**
+ * @return the number of failed send operations in the last interval
+ */
+ long getNumSendFailed();
+
+ /**
+ * @return the number of send acknowledges received by broker in the last interval
+ */
+ long getNumAcksReceived();
+
+ /**
+ * @return the messages send rate in the last interval
+ */
+ double getSendMsgsRate();
+
+ /**
+ * @return the bytes send rate in the last interval
+ */
+ double getSendBytesRate();
+
+ /**
+ * @return the 50th percentile of the send latency in milliseconds for the last interval
+ */
+ double getSendLatencyMillis50pct();
+
+ /**
+ * @return the 75th percentile of the send latency in milliseconds for the last interval
+ */
+ double getSendLatencyMillis75pct();
+
+ /**
+ * @return the 95th percentile of the send latency in milliseconds for the last interval
+ */
+ double getSendLatencyMillis95pct();
+
+ /**
+ * @return the 99th percentile of the send latency in milliseconds for the last interval
+ */
+ double getSendLatencyMillis99pct();
+
+ /**
+ * @return the 99.9th percentile of the send latency in milliseconds for the last interval
+ */
+ double getSendLatencyMillis999pct();
+
+ /**
+ * @return the max send latency in milliseconds for the last interval
+ */
+ double getSendLatencyMillisMax();
+
+ /**
+ * @return the total number of messages published by this producer
+ */
+ long getTotalMsgsSent();
+
+ /**
+ * @return the total number of bytes sent by this producer
+ */
+ long getTotalBytesSent();
+
+ /**
+ * @return the total number of failed send operations
+ */
+ long getTotalSendFailed();
+
+ /**
+ * @return the total number of send acknowledges received by broker
+ */
+ long getTotalAcksReceived();
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 183651a..ab812a0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,11 +25,6 @@ import static java.lang.String.format;
import static org.apache.pulsar.common.api.Commands.hasChecksum;
import static org.apache.pulsar.common.api.Commands.readChecksum;
-import com.google.common.collect.Iterables;
-import io.netty.buffer.ByteBuf;
-import io.netty.util.Timeout;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
@@ -52,8 +47,10 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -75,6 +72,13 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Iterables;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Timeout;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
public class ConsumerImpl<T> extends ConsumerBase<T> {
private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
@@ -106,7 +110,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
private final UnAckedMessageTracker unAckedMessageTracker;
private final ConcurrentNavigableMap<MessageIdImpl, BitSet> batchMessageAckTracker;
- protected final ConsumerStats stats;
+ protected final ConsumerStatsRecorder stats;
private final int priorityLevel;
private final SubscriptionMode subscriptionMode;
private BatchMessageIdImpl startMessageId;
@@ -150,9 +154,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
this.readCompacted = conf.isReadCompacted();
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
- stats = new ConsumerStats(client, conf, this);
+ stats = new ConsumerStatsRecorderImpl(client, conf, this);
} else {
- stats = ConsumerStats.CONSUMER_STATS_DISABLED;
+ stats = ConsumerStatsDisabled.INSTANCE;
}
if (conf.getReceiverQueueSize() <= 1) {
@@ -698,10 +702,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
return CompletableFuture.completedFuture(null);
}
- Timeout timeout = stats.getStatTimeout();
- if (timeout != null) {
- timeout.cancel();
- }
+ stats.getStatTimeout().ifPresent(Timeout::cancel);
setState(State.Closing);
@@ -1408,9 +1409,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
@Override
public ConsumerStats getStats() {
- if (stats instanceof ConsumerStatsDisabled) {
- return null;
- }
return stats;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
index 08b189c..717acfa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
@@ -18,29 +18,110 @@
*/
package org.apache.pulsar.client.impl;
+import java.util.Optional;
+
+import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
-public class ConsumerStatsDisabled extends ConsumerStats {
+import io.netty.util.Timeout;
+
+public class ConsumerStatsDisabled implements ConsumerStatsRecorder {
private static final long serialVersionUID = 1L;
+ static final ConsumerStatsRecorder INSTANCE = new ConsumerStatsDisabled();
+
@Override
- void updateNumMsgsReceived(Message<?> message) {
+ public void updateNumMsgsReceived(Message<?> message) {
// Do nothing
}
@Override
- void incrementNumReceiveFailed() {
+ public void incrementNumReceiveFailed() {
// Do nothing
}
@Override
- void incrementNumAcksSent(long numAcks) {
+ public void incrementNumAcksSent(long numAcks) {
// Do nothing
}
@Override
- void incrementNumAcksFailed() {
+ public void incrementNumAcksFailed() {
// Do nothing
}
+ @Override
+ public long getNumMsgsReceived() {
+ return 0;
+ }
+
+ @Override
+ public long getNumBytesReceived() {
+ return 0;
+ }
+
+ @Override
+ public long getNumAcksSent() {
+ return 0;
+ }
+
+ @Override
+ public long getNumAcksFailed() {
+ return 0;
+ }
+
+ @Override
+ public long getNumReceiveFailed() {
+ return 0;
+ }
+
+ @Override
+ public long getTotalMsgsReceived() {
+ return 0;
+ }
+
+ @Override
+ public long getTotalBytesReceived() {
+ return 0;
+ }
+
+ @Override
+ public long getTotalReceivedFailed() {
+ return 0;
+ }
+
+ @Override
+ public long getTotalAcksSent() {
+ return 0;
+ }
+
+ @Override
+ public long getTotalAcksFailed() {
+ return 0;
+ }
+
+ @Override
+ public double getRateMsgsReceived() {
+ return 0;
+ }
+
+ @Override
+ public double getRateBytesReceived() {
+ return 0;
+ }
+
+ @Override
+ public Optional<Timeout> getStatTimeout() {
+ return Optional.empty();
+ }
+
+ @Override
+ public void reset() {
+ // do nothing
+ }
+
+ @Override
+ public void updateCumulativeStats(ConsumerStats stats) {
+ // do nothing
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
similarity index 66%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
index 08b189c..f29ca09 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java
@@ -18,29 +18,25 @@
*/
package org.apache.pulsar.client.impl;
+import java.util.Optional;
+
+import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
-public class ConsumerStatsDisabled extends ConsumerStats {
- private static final long serialVersionUID = 1L;
+import io.netty.util.Timeout;
+
+public interface ConsumerStatsRecorder extends ConsumerStats {
+ void updateNumMsgsReceived(Message<?> message);
+
+ void incrementNumAcksSent(long numAcks);
- @Override
- void updateNumMsgsReceived(Message<?> message) {
- // Do nothing
- }
+ void incrementNumAcksFailed();
- @Override
- void incrementNumReceiveFailed() {
- // Do nothing
- }
+ void incrementNumReceiveFailed();
- @Override
- void incrementNumAcksSent(long numAcks) {
- // Do nothing
- }
+ Optional<Timeout> getStatTimeout();
- @Override
- void incrementNumAcksFailed() {
- // Do nothing
- }
+ void reset();
+ void updateCumulativeStats(ConsumerStats stats);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
similarity index 76%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
index 21ff3c4..01c51cc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
@@ -19,11 +19,12 @@
package org.apache.pulsar.client.impl;
import java.io.IOException;
-import java.io.Serializable;
import java.text.DecimalFormat;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
+import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.slf4j.Logger;
@@ -36,7 +37,7 @@ import com.fasterxml.jackson.databind.SerializationFeature;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
-public class ConsumerStats implements Serializable {
+public class ConsumerStatsRecorderImpl implements ConsumerStatsRecorder {
private static final long serialVersionUID = 1L;
private TimerTask stat;
@@ -56,11 +57,12 @@ public class ConsumerStats implements Serializable {
private final LongAdder totalAcksSent;
private final LongAdder totalAcksFailed;
- private final DecimalFormat throughputFormat;
+ private volatile double receivedMsgsRate;
+ private volatile double receivedBytesRate;
- public static final ConsumerStats CONSUMER_STATS_DISABLED = new ConsumerStatsDisabled();
+ private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00");
- public ConsumerStats() {
+ public ConsumerStatsRecorderImpl() {
numMsgsReceived = null;
numBytesReceived = null;
numReceiveFailed = null;
@@ -71,10 +73,10 @@ public class ConsumerStats implements Serializable {
totalReceiveFailed = null;
totalAcksSent = null;
totalAcksFailed = null;
- throughputFormat = null;
}
- public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfigurationData<?> conf, ConsumerImpl<?> consumer) {
+ public ConsumerStatsRecorderImpl(PulsarClientImpl pulsarClient, ConsumerConfigurationData<?> conf,
+ ConsumerImpl<?> consumer) {
this.pulsarClient = pulsarClient;
this.consumer = consumer;
this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
@@ -88,7 +90,6 @@ public class ConsumerStats implements Serializable {
totalReceiveFailed = new LongAdder();
totalAcksSent = new LongAdder();
totalAcksFailed = new LongAdder();
- throughputFormat = new DecimalFormat("0.00");
init(conf);
}
@@ -124,6 +125,9 @@ public class ConsumerStats implements Serializable {
totalAcksSent.add(currentNumAcksSent);
totalAcksFailed.add(currentNumAcksFailed);
+ receivedMsgsRate = currentNumMsgsReceived / elapsed;
+ receivedBytesRate = currentNumBytesReceived / elapsed;
+
if ((currentNumMsgsReceived | currentNumBytesReceived | currentNumReceiveFailed | currentNumAcksSent
| currentNumAcksFailed) != 0) {
log.info(
@@ -131,9 +135,9 @@ public class ConsumerStats implements Serializable {
+ "Throughput received: {} msg/s --- {} Mbit/s --- "
+ "Ack sent rate: {} ack/s --- " + "Failed messages: {} --- " + "Failed acks: {}",
consumer.getTopic(), consumer.getSubscription(), consumer.consumerName,
- consumer.incomingMessages.size(), throughputFormat.format(currentNumMsgsReceived / elapsed),
- throughputFormat.format(currentNumBytesReceived / elapsed * 8 / 1024 / 1024),
- throughputFormat.format(currentNumAcksSent / elapsed), currentNumReceiveFailed,
+ consumer.incomingMessages.size(), THROUGHPUT_FORMAT.format(receivedMsgsRate),
+ THROUGHPUT_FORMAT.format(receivedBytesRate * 8 / 1024 / 1024),
+ THROUGHPUT_FORMAT.format(currentNumAcksSent / elapsed), currentNumReceiveFailed,
currentNumAcksFailed);
}
} catch (Exception e) {
@@ -149,30 +153,36 @@ public class ConsumerStats implements Serializable {
statTimeout = pulsarClient.timer().newTimeout(stat, statsIntervalSeconds, TimeUnit.SECONDS);
}
- void updateNumMsgsReceived(Message<?> message) {
+ @Override
+ public void updateNumMsgsReceived(Message<?> message) {
if (message != null) {
numMsgsReceived.increment();
numBytesReceived.add(message.getData().length);
}
}
- void incrementNumAcksSent(long numAcks) {
+ @Override
+ public void incrementNumAcksSent(long numAcks) {
numAcksSent.add(numAcks);
}
- void incrementNumAcksFailed() {
+ @Override
+ public void incrementNumAcksFailed() {
numAcksFailed.increment();
}
- void incrementNumReceiveFailed() {
+ @Override
+ public void incrementNumReceiveFailed() {
numReceiveFailed.increment();
}
- Timeout getStatTimeout() {
- return statTimeout;
+ @Override
+ public Optional<Timeout> getStatTimeout() {
+ return Optional.ofNullable(statTimeout);
}
- void reset() {
+ @Override
+ public void reset() {
numMsgsReceived.reset();
numBytesReceived.reset();
numReceiveFailed.reset();
@@ -185,20 +195,21 @@ public class ConsumerStats implements Serializable {
totalAcksFailed.reset();
}
- void updateCumulativeStats(ConsumerStats stats) {
+ @Override
+ public void updateCumulativeStats(ConsumerStats stats) {
if (stats == null) {
return;
}
- numMsgsReceived.add(stats.numMsgsReceived.longValue());
- numBytesReceived.add(stats.numBytesReceived.longValue());
- numReceiveFailed.add(stats.numReceiveFailed.longValue());
- numAcksSent.add(stats.numAcksSent.longValue());
- numAcksFailed.add(stats.numAcksFailed.longValue());
- totalMsgsReceived.add(stats.totalMsgsReceived.longValue());
- totalBytesReceived.add(stats.totalBytesReceived.longValue());
- totalReceiveFailed.add(stats.totalReceiveFailed.longValue());
- totalAcksSent.add(stats.totalAcksSent.longValue());
- totalAcksFailed.add(stats.totalAcksFailed.longValue());
+ numMsgsReceived.add(stats.getNumMsgsReceived());
+ numBytesReceived.add(stats.getNumBytesReceived());
+ numReceiveFailed.add(stats.getNumReceiveFailed());
+ numAcksSent.add(stats.getNumAcksSent());
+ numAcksFailed.add(stats.getNumAcksFailed());
+ totalMsgsReceived.add(stats.getTotalMsgsReceived());
+ totalBytesReceived.add(stats.getTotalBytesReceived());
+ totalReceiveFailed.add(stats.getTotalReceivedFailed());
+ totalAcksSent.add(stats.getTotalAcksSent());
+ totalAcksFailed.add(stats.getTotalAcksFailed());
}
public long getNumMsgsReceived() {
@@ -241,5 +252,15 @@ public class ConsumerStats implements Serializable {
return totalAcksFailed.longValue();
}
- private static final Logger log = LoggerFactory.getLogger(ConsumerStats.class);
+ @Override
+ public double getRateMsgsReceived() {
+ return receivedMsgsRate;
+ }
+
+ @Override
+ public double getRateBytesReceived() {
+ return receivedBytesRate;
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ConsumerStatsRecorderImpl.class);
}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index df6894c..f46c2b8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -63,7 +63,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
private final int numPartitions;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private final ConsumerStats stats;
+ private final ConsumerStatsRecorderImpl stats;
private final UnAckedMessageTracker unAckedMessageTracker;
PartitionedConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, int numPartitions,
@@ -81,7 +81,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
}
- stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null;
+ stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStatsRecorderImpl() : null;
checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Partitioned Topics");
start();
@@ -530,7 +530,7 @@ public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
}
@Override
- public synchronized ConsumerStats getStats() {
+ public synchronized ConsumerStatsRecorderImpl getStats() {
if (stats == null) {
return null;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 5281d51..23a86e6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -47,7 +47,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
private List<ProducerImpl<T>> producers;
private MessageRouter routerPolicy;
- private final ProducerStats stats;
+ private final ProducerStatsRecorderImpl stats;
private final TopicMetadata topicMetadata;
public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions,
@@ -56,7 +56,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
this.producers = Lists.newArrayListWithCapacity(numPartitions);
this.topicMetadata = new TopicMetadataImpl(numPartitions);
this.routerPolicy = getMessageRouter();
- stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStats() : null;
+ stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStatsRecorderImpl() : null;
int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
@@ -204,7 +204,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
}
@Override
- public synchronized ProducerStats getStats() {
+ public synchronized ProducerStatsRecorderImpl getStats() {
if (stats == null) {
return null;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 15779cd..8f0586b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -93,7 +93,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
private String connectedSince;
private final int partitionIndex;
- private final ProducerStats stats;
+ private final ProducerStatsRecorder stats;
private final CompressionCodec compressor;
@@ -161,9 +161,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
this.batchMessageContainer = null;
}
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
- stats = new ProducerStats(client, conf, this);
+ stats = new ProducerStatsRecorderImpl(client, conf, this);
} else {
- stats = ProducerStats.PRODUCER_STATS_DISABLED;
+ stats = ProducerStatsDisabled.INSTANCE;
}
if (conf.getProperties().isEmpty()) {
@@ -1240,10 +1240,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
}
@Override
- public ProducerStats getStats() {
- if (stats instanceof ProducerStatsDisabled) {
- return null;
- }
+ public ProducerStatsRecorder getStats() {
return stats;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
index 3c80f62..214f46c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
@@ -18,27 +18,113 @@
*/
package org.apache.pulsar.client.impl;
-public class ProducerStatsDisabled extends ProducerStats {
+public class ProducerStatsDisabled implements ProducerStatsRecorder {
private static final long serialVersionUID = 1L;
+ static final ProducerStatsRecorder INSTANCE = new ProducerStatsDisabled();
+
@Override
- void incrementSendFailed() {
+ public void incrementSendFailed() {
// Do nothing
}
@Override
- void incrementSendFailed(long numMsgs) {
+ public void incrementSendFailed(long numMsgs) {
// Do nothing
}
@Override
- void incrementNumAcksReceived(long latencyNs) {
+ public void incrementNumAcksReceived(long latencyNs) {
// Do nothing
}
@Override
- void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
+ public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
// Do nothing
}
+ @Override
+ public void cancelStatsTimeout() {
+ // Do nothing
+ }
+
+ @Override
+ public long getNumMsgsSent() {
+ return 0;
+ }
+
+ @Override
+ public long getNumBytesSent() {
+ return 0;
+ }
+
+ @Override
+ public long getNumSendFailed() {
+ return 0;
+ }
+
+ @Override
+ public long getNumAcksReceived() {
+ return 0;
+ }
+
+ @Override
+ public long getTotalMsgsSent() {
+ return 0;
+ }
+
+ @Override
+ public long getTotalBytesSent() {
+ return 0;
+ }
+
+ @Override
+ public long getTotalSendFailed() {
+ return 0;
+ }
+
+ @Override
+ public long getTotalAcksReceived() {
+ return 0;
+ }
+
+ @Override
+ public double getSendMsgsRate() {
+ return 0;
+ }
+
+ @Override
+ public double getSendBytesRate() {
+ return 0;
+ }
+
+ @Override
+ public double getSendLatencyMillis50pct() {
+ return 0;
+ }
+
+ @Override
+ public double getSendLatencyMillis75pct() {
+ return 0;
+ }
+
+ @Override
+ public double getSendLatencyMillis95pct() {
+ return 0;
+ }
+
+ @Override
+ public double getSendLatencyMillis99pct() {
+ return 0;
+ }
+
+ @Override
+ public double getSendLatencyMillis999pct() {
+ return 0;
+ }
+
+ @Override
+ public double getSendLatencyMillisMax() {
+ return 0;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorder.java
similarity index 64%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorder.java
index 3c80f62..4c9bd9e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorder.java
@@ -18,27 +18,16 @@
*/
package org.apache.pulsar.client.impl;
-public class ProducerStatsDisabled extends ProducerStats {
- private static final long serialVersionUID = 1L;
+import org.apache.pulsar.client.api.ProducerStats;
- @Override
- void incrementSendFailed() {
- // Do nothing
- }
+public interface ProducerStatsRecorder extends ProducerStats {
+ void updateNumMsgsSent(long numMsgs, long totalMsgsSize);
- @Override
- void incrementSendFailed(long numMsgs) {
- // Do nothing
- }
+ void incrementSendFailed();
- @Override
- void incrementNumAcksReceived(long latencyNs) {
- // Do nothing
- }
+ void incrementSendFailed(long numMsgs);
- @Override
- void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
- // Do nothing
- }
+ void incrementNumAcksReceived(long latencyNs);
+ void cancelStatsTimeout();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
similarity index 65%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index 2628003..6bea137 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -19,11 +19,11 @@
package org.apache.pulsar.client.impl;
import java.io.IOException;
-import java.io.Serializable;
import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
+import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +36,7 @@ import com.yahoo.sketches.quantiles.DoublesSketch;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
-public class ProducerStats implements Serializable {
+public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
private static final long serialVersionUID = 1L;
private TimerTask stat;
@@ -53,14 +53,17 @@ public class ProducerStats implements Serializable {
private final LongAdder totalBytesSent;
private final LongAdder totalSendFailed;
private final LongAdder totalAcksReceived;
- private final DecimalFormat dec;
- private final DecimalFormat throughputFormat;
+ private static final DecimalFormat DEC = new DecimalFormat("0.000");
+ private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00");
private final DoublesSketch ds;
- private final double[] percentiles = { 0.5, 0.95, 0.99, 0.999, 0.9999 };
- public static final ProducerStats PRODUCER_STATS_DISABLED = new ProducerStatsDisabled();
+ private volatile double sendMsgsRate;
+ private volatile double sendBytesRate;
+ private volatile double[] latencyPctValues;
- public ProducerStats() {
+ private static final double[] PERCENTILES = { 0.5, 0.75, 0.95, 0.99, 0.999, 1.0 };
+
+ public ProducerStatsRecorderImpl() {
numMsgsSent = null;
numBytesSent = null;
numSendFailed = null;
@@ -69,12 +72,11 @@ public class ProducerStats implements Serializable {
totalBytesSent = null;
totalSendFailed = null;
totalAcksReceived = null;
- dec = null;
- throughputFormat = null;
ds = null;
}
- public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfigurationData conf, ProducerImpl<?> producer) {
+ public ProducerStatsRecorderImpl(PulsarClientImpl pulsarClient, ProducerConfigurationData conf,
+ ProducerImpl<?> producer) {
this.pulsarClient = pulsarClient;
this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
this.producer = producer;
@@ -87,8 +89,6 @@ public class ProducerStats implements Serializable {
totalSendFailed = new LongAdder();
totalAcksReceived = new LongAdder();
ds = DoublesSketch.builder().build(256);
- dec = new DecimalFormat("0.000");
- throughputFormat = new DecimalFormat("0.00");
init(conf);
}
@@ -125,32 +125,33 @@ public class ProducerStats implements Serializable {
totalSendFailed.add(currentNumSendFailedMsgs);
totalAcksReceived.add(currentNumAcksReceived);
- double[] percentileValues;
synchronized (ds) {
- percentileValues = ds.getQuantiles(percentiles);
+ latencyPctValues = ds.getQuantiles(PERCENTILES);
ds.reset();
}
+ sendMsgsRate = currentNumMsgsSent / elapsed;
+ sendBytesRate = currentNumBytesSent / elapsed;
+
if ((currentNumMsgsSent | currentNumSendFailedMsgs | currentNumAcksReceived
| currentNumMsgsSent) != 0) {
- for (int i = 0; i < percentileValues.length; i++) {
- if (percentileValues[i] == Double.NaN) {
- percentileValues[i] = 0;
+ for (int i = 0; i < latencyPctValues.length; i++) {
+ if (latencyPctValues[i] == Double.NaN) {
+ latencyPctValues[i] = 0;
}
}
- log.info(
- "[{}] [{}] Pending messages: {} --- Publish throughput: {} msg/s --- {} Mbit/s --- "
- + "Latency: med: {} ms - 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - 99.99pct: {} ms --- "
- + "Ack received rate: {} ack/s --- Failed messages: {}",
- producer.getTopic(), producer.getProducerName(), producer.getPendingQueueSize(),
- throughputFormat.format(currentNumMsgsSent / elapsed),
- throughputFormat.format(currentNumBytesSent / elapsed / 1024 / 1024 * 8),
- dec.format(percentileValues[0] / 1000.0), dec.format(percentileValues[1] / 1000.0),
- dec.format(percentileValues[2] / 1000.0), dec.format(percentileValues[3] / 1000.0),
- dec.format(percentileValues[4] / 1000.0),
- throughputFormat.format(currentNumAcksReceived / elapsed), currentNumSendFailedMsgs);
+ log.info("[{}] [{}] Pending messages: {} --- Publish throughput: {} msg/s --- {} Mbit/s --- "
+ + "Latency: med: {} ms - 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - max: {} ms --- "
+ + "Ack received rate: {} ack/s --- Failed messages: {}", producer.getTopic(),
+ producer.getProducerName(), producer.getPendingQueueSize(),
+ THROUGHPUT_FORMAT.format(sendMsgsRate),
+ THROUGHPUT_FORMAT.format(sendBytesRate / 1024 / 1024 * 8),
+ DEC.format(latencyPctValues[0] / 1000.0), DEC.format(latencyPctValues[2] / 1000.0),
+ DEC.format(latencyPctValues[3] / 1000.0), DEC.format(latencyPctValues[4] / 1000.0),
+ DEC.format(latencyPctValues[5] / 1000.0),
+ THROUGHPUT_FORMAT.format(currentNumAcksReceived / elapsed), currentNumSendFailedMsgs);
}
} catch (Exception e) {
@@ -170,20 +171,24 @@ public class ProducerStats implements Serializable {
return statTimeout;
}
- void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
+ @Override
+ public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
numMsgsSent.add(numMsgs);
numBytesSent.add(totalMsgsSize);
}
- void incrementSendFailed() {
+ @Override
+ public void incrementSendFailed() {
numSendFailed.increment();
}
- void incrementSendFailed(long numMsgs) {
+ @Override
+ public void incrementSendFailed(long numMsgs) {
numSendFailed.add(numMsgs);
}
- void incrementNumAcksReceived(long latencyNs) {
+ @Override
+ public void incrementNumAcksReceived(long latencyNs) {
numAcksReceived.increment();
synchronized (ds) {
ds.update(TimeUnit.NANOSECONDS.toMicros(latencyNs));
@@ -205,28 +210,32 @@ public class ProducerStats implements Serializable {
if (stats == null) {
return;
}
- numMsgsSent.add(stats.numMsgsSent.longValue());
- numBytesSent.add(stats.numBytesSent.longValue());
- numSendFailed.add(stats.numSendFailed.longValue());
- numAcksReceived.add(stats.numAcksReceived.longValue());
- totalMsgsSent.add(stats.numMsgsSent.longValue());
- totalBytesSent.add(stats.numBytesSent.longValue());
- totalSendFailed.add(stats.numSendFailed.longValue());
- totalAcksReceived.add(stats.numAcksReceived.longValue());
+ numMsgsSent.add(stats.getNumMsgsSent());
+ numBytesSent.add(stats.getNumBytesSent());
+ numSendFailed.add(stats.getNumSendFailed());
+ numAcksReceived.add(stats.getNumAcksReceived());
+ totalMsgsSent.add(stats.getNumMsgsSent());
+ totalBytesSent.add(stats.getNumBytesSent());
+ totalSendFailed.add(stats.getNumSendFailed());
+ totalAcksReceived.add(stats.getNumAcksReceived());
}
+ @Override
public long getNumMsgsSent() {
return numMsgsSent.longValue();
}
+ @Override
public long getNumBytesSent() {
return numBytesSent.longValue();
}
+ @Override
public long getNumSendFailed() {
return numSendFailed.longValue();
}
+ @Override
public long getNumAcksReceived() {
return numAcksReceived.longValue();
}
@@ -247,6 +256,46 @@ public class ProducerStats implements Serializable {
return totalAcksReceived.longValue();
}
+ @Override
+ public double getSendMsgsRate() {
+ return sendMsgsRate;
+ }
+
+ @Override
+ public double getSendBytesRate() {
+ return sendBytesRate;
+ }
+
+ @Override
+ public double getSendLatencyMillis50pct() {
+ return latencyPctValues[0];
+ }
+
+ @Override
+ public double getSendLatencyMillis75pct() {
+ return latencyPctValues[1];
+ }
+
+ @Override
+ public double getSendLatencyMillis95pct() {
+ return latencyPctValues[2];
+ }
+
+ @Override
+ public double getSendLatencyMillis99pct() {
+ return latencyPctValues[3];
+ }
+
+ @Override
+ public double getSendLatencyMillis999pct() {
+ return latencyPctValues[4];
+ }
+
+ @Override
+ public double getSendLatencyMillisMax() {
+ return latencyPctValues[5];
+ }
+
public void cancelStatsTimeout() {
if (statTimeout != null) {
statTimeout.cancel();
@@ -254,5 +303,5 @@ public class ProducerStats implements Serializable {
}
}
- private static final Logger log = LoggerFactory.getLogger(ProducerStats.class);
+ private static final Logger log = LoggerFactory.getLogger(ProducerStatsRecorderImpl.class);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
index 929e6f7..e8c018c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -38,7 +39,9 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -47,14 +50,12 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-
public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
// All topics should be in same namespace
@@ -78,7 +79,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
AtomicInteger numberTopicPartitions;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private final ConsumerStats stats;
+ private final ConsumerStatsRecorder stats;
private final UnAckedMessageTracker unAckedMessageTracker;
private final ConsumerConfigurationData<T> internalConfig;
@@ -103,7 +104,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
}
this.internalConfig = getInternalConsumerConfig();
- this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null;
+ this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStatsRecorderImpl() : null;
if (conf.getTopicNames().isEmpty()) {
this.namespaceName = null;
@@ -670,7 +671,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
client.externalExecutorProvider().getExecutor(), 0, subFuture, schema);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
- futureList = Lists.newArrayList(subFuture);
+ futureList = Collections.singletonList(subFuture);
}
FutureUtil.waitForAll(futureList)
@@ -809,7 +810,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
}
// get partitioned consumers
- public List<ConsumerImpl> getConsumers() {
+ public List<ConsumerImpl<T>> getConsumers() {
return consumers.values().stream().collect(Collectors.toList());
}
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.