You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/06/28 11:42:30 UTC
[rocketmq-clients] 02/03: Java: adapt with the latest metrics standard
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 1d3a666cb83423e798112aa83e044b732077f7fe
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Tue Jun 28 17:09:12 2022 +0800
Java: adapt with the latest metrics standard
---
...cketmqAttributes.java => InvocationStatus.java} | 16 ++++---
.../rocketmq/client/java/metrics/MessageMeter.java | 31 +++-----------
.../{RocketmqAttributes.java => MetricLabels.java} | 5 ++-
.../java/metrics/MetricMessageInterceptor.java | 50 +++++++++-------------
.../rocketmq/client/java/metrics/MetricName.java | 29 +++++++------
5 files changed, 54 insertions(+), 77 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/InvocationStatus.java
similarity index 66%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/InvocationStatus.java
index d7a2d28..6786f96 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/InvocationStatus.java
@@ -17,15 +17,17 @@
package org.apache.rocketmq.client.java.metrics;
-import static io.opentelemetry.api.common.AttributeKey.stringKey;
+public enum InvocationStatus {
+ SUCCESS("success"),
+ FAILURE("failure");
-import io.opentelemetry.api.common.AttributeKey;
+ private final String name;
-public class RocketmqAttributes {
- public static final AttributeKey<String> TOPIC = stringKey("topic");
- public static final AttributeKey<String> CLIENT_ID = stringKey("client_id");
- public static final AttributeKey<String> CONSUMER_GROUP = stringKey("consumer_group");
+ InvocationStatus(String name) {
+ this.name = name;
+ }
- private RocketmqAttributes() {
+ public String getName() {
+ return name;
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
index 06f65fb..7025a15 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
@@ -24,7 +24,6 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
-import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
@@ -65,12 +64,10 @@ public class MessageMeter {
private volatile MessageCacheObserver messageCacheObserver;
- private final ConcurrentMap<MetricName, LongCounter> counterMap;
private final ConcurrentMap<MetricName, DoubleHistogram> histogramMap;
public MessageMeter(ClientImpl client) {
this.client = client;
- this.counterMap = new ConcurrentHashMap<>();
this.histogramMap = new ConcurrentHashMap<>();
this.client.registerMessageInterceptor(new MetricMessageInterceptor(this));
this.messageCacheObserver = null;
@@ -80,26 +77,11 @@ public class MessageMeter {
this.messageCacheObserver = messageCacheObserver;
}
- LongCounter getSendFailureTotalCounter() {
- return counterMap.computeIfAbsent(MetricName.SEND_FAILURE_TOTAL,
- name -> meter.counterBuilder(name.getName()).build());
- }
-
DoubleHistogram getSendSuccessCostTimeHistogram() {
return histogramMap.computeIfAbsent(MetricName.SEND_SUCCESS_COST_TIME,
name -> meter.histogramBuilder(name.getName()).build());
}
- LongCounter getProcessSuccessTotalCounter() {
- return counterMap.computeIfAbsent(MetricName.PROCESS_SUCCESS_TOTAL,
- name -> meter.counterBuilder(name.getName()).build());
- }
-
- LongCounter getProcessFailureTotalCounter() {
- return counterMap.computeIfAbsent(MetricName.PROCESS_FAILURE_TOTAL,
- name -> meter.counterBuilder(name.getName()).build());
- }
-
DoubleHistogram getProcessCostTimeHistogram() {
return histogramMap.computeIfAbsent(MetricName.PROCESS_TIME,
name -> meter.histogramBuilder(name.getName()).build());
@@ -223,7 +205,6 @@ public class MessageMeter {
}
private void reset() {
- counterMap.clear();
histogramMap.clear();
meter.gaugeBuilder(MetricName.CONSUMER_CACHED_MESSAGES.getName()).buildWithCallback(measurement -> {
final Optional<String> optionalConsumerGroup = tryGetConsumerGroup();
@@ -235,9 +216,9 @@ public class MessageMeter {
final Map<String, Long> cachedMessageCountMap = messageCacheObserver.getCachedMessageCount();
for (Map.Entry<String, Long> entry : cachedMessageCountMap.entrySet()) {
final String topic = entry.getKey();
- Attributes attributes = Attributes.builder().put(RocketmqAttributes.TOPIC, topic)
- .put(RocketmqAttributes.CONSUMER_GROUP, consumerGroup)
- .put(RocketmqAttributes.CLIENT_ID, client.getClientId()).build();
+ Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, topic)
+ .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+ .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
measurement.record(entry.getValue(), attributes);
}
});
@@ -251,9 +232,9 @@ public class MessageMeter {
final Map<String, Long> cachedMessageBytesMap = messageCacheObserver.getCachedMessageBytes();
for (Map.Entry<String, Long> entry : cachedMessageBytesMap.entrySet()) {
final String topic = entry.getKey();
- Attributes attributes = Attributes.builder().put(RocketmqAttributes.TOPIC, topic)
- .put(RocketmqAttributes.CONSUMER_GROUP, consumerGroup)
- .put(RocketmqAttributes.CLIENT_ID, client.getClientId()).build();
+ Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, topic)
+ .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+ .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
measurement.record(entry.getValue(), attributes);
}
});
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricLabels.java
similarity index 88%
rename from java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricLabels.java
index d7a2d28..c676ca1 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricLabels.java
@@ -21,11 +21,12 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey;
import io.opentelemetry.api.common.AttributeKey;
-public class RocketmqAttributes {
+public class MetricLabels {
public static final AttributeKey<String> TOPIC = stringKey("topic");
public static final AttributeKey<String> CLIENT_ID = stringKey("client_id");
public static final AttributeKey<String> CONSUMER_GROUP = stringKey("consumer_group");
+ public static final AttributeKey<String> INVOCATION_STATUS = stringKey("invocation_status");
- private RocketmqAttributes() {
+ private MetricLabels() {
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
index 9beb3e2..62fbe55 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
@@ -21,7 +21,6 @@ import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
-import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import java.time.Duration;
import java.util.List;
@@ -46,17 +45,15 @@ public class MetricMessageInterceptor implements MessageInterceptor {
private void doAfterSendMessage(List<MessageCommon> messageCommons, Duration duration,
MessageHookPointsStatus status) {
- final LongCounter sendFailureCounter = messageMeter.getSendFailureTotalCounter();
final DoubleHistogram costTimeHistogram = messageMeter.getSendSuccessCostTimeHistogram();
for (MessageCommon messageCommon : messageCommons) {
- Attributes attributes = Attributes.builder().put(RocketmqAttributes.TOPIC, messageCommon.getTopic())
- .put(RocketmqAttributes.CLIENT_ID, messageMeter.getClient().getClientId()).build();
- if (MessageHookPointsStatus.OK.equals(status)) {
- costTimeHistogram.record(duration.toMillis(), attributes);
- }
- if (MessageHookPointsStatus.ERROR.equals(status)) {
- sendFailureCounter.add(1, attributes);
- }
+ InvocationStatus invocationStatus = MessageHookPointsStatus.OK.equals(status) ? InvocationStatus.SUCCESS :
+ InvocationStatus.FAILURE;
+ Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
+ .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId())
+ .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName())
+ .build();
+ costTimeHistogram.record(duration.toMillis(), attributes);
}
}
@@ -81,9 +78,9 @@ public class MetricMessageInterceptor implements MessageInterceptor {
final Timestamp deliveryTimestampFromRemote = optionalDeliveryTimestampFromRemote.get();
final long latency = System.currentTimeMillis() - Timestamps.toMillis(deliveryTimestampFromRemote);
final DoubleHistogram messageDeliveryLatencyHistogram = messageMeter.getMessageDeliveryLatencyHistogram();
- final Attributes attributes = Attributes.builder().put(RocketmqAttributes.TOPIC, messageCommon.getTopic())
- .put(RocketmqAttributes.CONSUMER_GROUP, consumerGroup)
- .put(RocketmqAttributes.CLIENT_ID, messageMeter.getClient().getClientId()).build();
+ final Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
+ .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+ .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId()).build();
messageDeliveryLatencyHistogram.record(latency, attributes);
}
@@ -100,17 +97,15 @@ public class MetricMessageInterceptor implements MessageInterceptor {
return;
}
final Duration durationAfterDecoding = optionalDurationAfterDecoding.get();
- Attributes attributes = Attributes.builder().put(RocketmqAttributes.TOPIC, messageCommon.getTopic())
- .put(RocketmqAttributes.CONSUMER_GROUP, consumerGroup)
- .put(RocketmqAttributes.CLIENT_ID, messageMeter.getClient().getClientId()).build();
+ Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
+ .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+ .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId()).build();
final DoubleHistogram histogram = messageMeter.getMessageAwaitTimeHistogram();
histogram.record(durationAfterDecoding.toMillis(), attributes);
}
private void doAfterProcessMessage(List<MessageCommon> messageCommons, Duration duration,
MessageHookPointsStatus status) {
- final LongCounter processSuccessCounter = messageMeter.getProcessSuccessTotalCounter();
- final LongCounter processFailureCounter = messageMeter.getProcessFailureTotalCounter();
final DoubleHistogram processCostTimeHistogram = messageMeter.getProcessCostTimeHistogram();
final ClientImpl client = messageMeter.getClient();
if (!(client instanceof PushConsumer)) {
@@ -120,17 +115,14 @@ public class MetricMessageInterceptor implements MessageInterceptor {
}
PushConsumer pushConsumer = (PushConsumer) client;
for (MessageCommon messageCommon : messageCommons) {
- Attributes attributes = Attributes.builder().put(RocketmqAttributes.TOPIC, messageCommon.getTopic())
- .put(RocketmqAttributes.CONSUMER_GROUP, pushConsumer.getConsumerGroup())
- .put(RocketmqAttributes.CLIENT_ID, messageMeter.getClient().getClientId()).build();
- if (MessageHookPointsStatus.OK.equals(status)) {
- processSuccessCounter.add(1, attributes);
- processCostTimeHistogram.record(duration.toMillis(), attributes);
- }
- if (MessageHookPointsStatus.ERROR.equals(status)) {
- processFailureCounter.add(1, attributes);
- processCostTimeHistogram.record(duration.toMillis(), attributes);
- }
+ InvocationStatus invocationStatus = MessageHookPointsStatus.OK.equals(status) ? InvocationStatus.SUCCESS :
+ InvocationStatus.FAILURE;
+ Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
+ .put(MetricLabels.CONSUMER_GROUP, pushConsumer.getConsumerGroup())
+ .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId())
+ .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName())
+ .build();
+ processCostTimeHistogram.record(duration.toMillis(), attributes);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
index a4e984f..caf2ef2 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
@@ -18,40 +18,41 @@
package org.apache.rocketmq.client.java.metrics;
public enum MetricName {
- /**
- * A counter that records the number of failed api calls of message publishing.
- */
- SEND_FAILURE_TOTAL("rocketmq_send_failure_total"),
/**
* A histogram that records the cost time of successful api calls of message publishing.
+ *
+ * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID}, {@link MetricLabels#INVOCATION_STATUS}.
*/
- SEND_SUCCESS_COST_TIME("rocketmq_send_success_cost_time"),
- /**
- * A counter that records the number of successful consumption of message.
- */
- PROCESS_SUCCESS_TOTAL("rocketmq_process_success_total"),
- /**
- * A counter that records the number of failed consumption of message.
- */
- PROCESS_FAILURE_TOTAL("rocketmq_process_failure_total"),
+ SEND_SUCCESS_COST_TIME("rocketmq_send_cost_time"),
/**
* A gauge that records the cached message count of push consumer.
+ *
+ * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID}, {@link MetricLabels#CONSUMER_GROUP}.
*/
CONSUMER_CACHED_MESSAGES("rocketmq_consumer_cached_messages"),
/**
* A gauge that records the cached message bytes of push consumer.
+ *
+ * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID}, {@link MetricLabels#CONSUMER_GROUP}.
*/
CONSUMER_CACHED_BYTES("rocketmq_consumer_cached_bytes"),
/**
* A histogram that records the latency of message delivery from remote.
+ *
+ * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID}, {@link MetricLabels#CONSUMER_GROUP}.
*/
DELIVERY_LATENCY("rocketmq_delivery_latency"),
/**
* A histogram that records await time of message consumption.
+ *
+ * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID}, {@link MetricLabels#CONSUMER_GROUP}.
*/
AWAIT_TIME("rocketmq_await_time"),
/**
- * A counter that records the process time of message consumption.
+ * A histogram that records the process time of message consumption.
+ *
+ * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID}, {@link MetricLabels#CONSUMER_GROUP},
+ * {@link MetricLabels#INVOCATION_STATUS}.
*/
PROCESS_TIME("rocketmq_process_time");