You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/06/28 11:42:30 UTC

[rocketmq-clients] 02/03: Java: adapt with the latest metrics standard

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 1d3a666cb83423e798112aa83e044b732077f7fe
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Tue Jun 28 17:09:12 2022 +0800

    Java: adapt with the latest metrics standard
---
 ...cketmqAttributes.java => InvocationStatus.java} | 16 ++++---
 .../rocketmq/client/java/metrics/MessageMeter.java | 31 +++-----------
 .../{RocketmqAttributes.java => MetricLabels.java} |  5 ++-
 .../java/metrics/MetricMessageInterceptor.java     | 50 +++++++++-------------
 .../rocketmq/client/java/metrics/MetricName.java   | 29 +++++++------
 5 files changed, 54 insertions(+), 77 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/InvocationStatus.java
similarity index 66%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/InvocationStatus.java
index d7a2d28..6786f96 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/InvocationStatus.java
@@ -17,15 +17,17 @@
 
 package org.apache.rocketmq.client.java.metrics;
 
-import static io.opentelemetry.api.common.AttributeKey.stringKey;
+public enum InvocationStatus {
+    SUCCESS("success"),
+    FAILURE("failure");
 
-import io.opentelemetry.api.common.AttributeKey;
+    private final String name;
 
-public class RocketmqAttributes {
-    public static final AttributeKey<String> TOPIC = stringKey("topic");
-    public static final AttributeKey<String> CLIENT_ID = stringKey("client_id");
-    public static final AttributeKey<String> CONSUMER_GROUP = stringKey("consumer_group");
+    InvocationStatus(String name) {
+        this.name = name;
+    }
 
-    private RocketmqAttributes() {
+    public String getName() {
+        return name;
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
index 06f65fb..7025a15 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
@@ -24,7 +24,6 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
 import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.metrics.DoubleHistogram;
-import io.opentelemetry.api.metrics.LongCounter;
 import io.opentelemetry.api.metrics.Meter;
 import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
 import io.opentelemetry.sdk.OpenTelemetrySdk;
@@ -65,12 +64,10 @@ public class MessageMeter {
 
     private volatile MessageCacheObserver messageCacheObserver;
 
-    private final ConcurrentMap<MetricName, LongCounter> counterMap;
     private final ConcurrentMap<MetricName, DoubleHistogram> histogramMap;
 
     public MessageMeter(ClientImpl client) {
         this.client = client;
-        this.counterMap = new ConcurrentHashMap<>();
         this.histogramMap = new ConcurrentHashMap<>();
         this.client.registerMessageInterceptor(new MetricMessageInterceptor(this));
         this.messageCacheObserver = null;
@@ -80,26 +77,11 @@ public class MessageMeter {
         this.messageCacheObserver = messageCacheObserver;
     }
 
-    LongCounter getSendFailureTotalCounter() {
-        return counterMap.computeIfAbsent(MetricName.SEND_FAILURE_TOTAL,
-            name -> meter.counterBuilder(name.getName()).build());
-    }
-
     DoubleHistogram getSendSuccessCostTimeHistogram() {
         return histogramMap.computeIfAbsent(MetricName.SEND_SUCCESS_COST_TIME,
             name -> meter.histogramBuilder(name.getName()).build());
     }
 
-    LongCounter getProcessSuccessTotalCounter() {
-        return counterMap.computeIfAbsent(MetricName.PROCESS_SUCCESS_TOTAL,
-            name -> meter.counterBuilder(name.getName()).build());
-    }
-
-    LongCounter getProcessFailureTotalCounter() {
-        return counterMap.computeIfAbsent(MetricName.PROCESS_FAILURE_TOTAL,
-            name -> meter.counterBuilder(name.getName()).build());
-    }
-
     DoubleHistogram getProcessCostTimeHistogram() {
         return histogramMap.computeIfAbsent(MetricName.PROCESS_TIME,
             name -> meter.histogramBuilder(name.getName()).build());
@@ -223,7 +205,6 @@ public class MessageMeter {
     }
 
     private void reset() {
-        counterMap.clear();
         histogramMap.clear();
         meter.gaugeBuilder(MetricName.CONSUMER_CACHED_MESSAGES.getName()).buildWithCallback(measurement -> {
             final Optional<String> optionalConsumerGroup = tryGetConsumerGroup();
@@ -235,9 +216,9 @@ public class MessageMeter {
             final Map<String, Long> cachedMessageCountMap = messageCacheObserver.getCachedMessageCount();
             for (Map.Entry<String, Long> entry : cachedMessageCountMap.entrySet()) {
                 final String topic = entry.getKey();
-                Attributes attributes = Attributes.builder().put(RocketmqAttributes.TOPIC, topic)
-                    .put(RocketmqAttributes.CONSUMER_GROUP, consumerGroup)
-                    .put(RocketmqAttributes.CLIENT_ID, client.getClientId()).build();
+                Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, topic)
+                    .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+                    .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
                 measurement.record(entry.getValue(), attributes);
             }
         });
@@ -251,9 +232,9 @@ public class MessageMeter {
             final Map<String, Long> cachedMessageBytesMap = messageCacheObserver.getCachedMessageBytes();
             for (Map.Entry<String, Long> entry : cachedMessageBytesMap.entrySet()) {
                 final String topic = entry.getKey();
-                Attributes attributes = Attributes.builder().put(RocketmqAttributes.TOPIC, topic)
-                    .put(RocketmqAttributes.CONSUMER_GROUP, consumerGroup)
-                    .put(RocketmqAttributes.CLIENT_ID, client.getClientId()).build();
+                Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, topic)
+                    .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+                    .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
                 measurement.record(entry.getValue(), attributes);
             }
         });
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricLabels.java
similarity index 88%
rename from java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricLabels.java
index d7a2d28..c676ca1 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricLabels.java
@@ -21,11 +21,12 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey;
 
 import io.opentelemetry.api.common.AttributeKey;
 
-public class RocketmqAttributes {
+public class MetricLabels {
     public static final AttributeKey<String> TOPIC = stringKey("topic");
     public static final AttributeKey<String> CLIENT_ID = stringKey("client_id");
     public static final AttributeKey<String> CONSUMER_GROUP = stringKey("consumer_group");
+    public static final AttributeKey<String> INVOCATION_STATUS = stringKey("invocation_status");
 
-    private RocketmqAttributes() {
+    private MetricLabels() {
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
index 9beb3e2..62fbe55 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
@@ -21,7 +21,6 @@ import com.google.protobuf.Timestamp;
 import com.google.protobuf.util.Timestamps;
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.metrics.DoubleHistogram;
-import io.opentelemetry.api.metrics.LongCounter;
 import io.opentelemetry.api.metrics.Meter;
 import java.time.Duration;
 import java.util.List;
@@ -46,17 +45,15 @@ public class MetricMessageInterceptor implements MessageInterceptor {
 
     private void doAfterSendMessage(List<MessageCommon> messageCommons, Duration duration,
         MessageHookPointsStatus status) {
-        final LongCounter sendFailureCounter = messageMeter.getSendFailureTotalCounter();
         final DoubleHistogram costTimeHistogram = messageMeter.getSendSuccessCostTimeHistogram();
         for (MessageCommon messageCommon : messageCommons) {
-            Attributes attributes = Attributes.builder().put(RocketmqAttributes.TOPIC, messageCommon.getTopic())
-                .put(RocketmqAttributes.CLIENT_ID, messageMeter.getClient().getClientId()).build();
-            if (MessageHookPointsStatus.OK.equals(status)) {
-                costTimeHistogram.record(duration.toMillis(), attributes);
-            }
-            if (MessageHookPointsStatus.ERROR.equals(status)) {
-                sendFailureCounter.add(1, attributes);
-            }
+            InvocationStatus invocationStatus = MessageHookPointsStatus.OK.equals(status) ? InvocationStatus.SUCCESS :
+                InvocationStatus.FAILURE;
+            Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
+                .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId())
+                .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName())
+                .build();
+            costTimeHistogram.record(duration.toMillis(), attributes);
         }
     }
 
@@ -81,9 +78,9 @@ public class MetricMessageInterceptor implements MessageInterceptor {
         final Timestamp deliveryTimestampFromRemote = optionalDeliveryTimestampFromRemote.get();
         final long latency = System.currentTimeMillis() - Timestamps.toMillis(deliveryTimestampFromRemote);
         final DoubleHistogram messageDeliveryLatencyHistogram = messageMeter.getMessageDeliveryLatencyHistogram();
-        final Attributes attributes = Attributes.builder().put(RocketmqAttributes.TOPIC, messageCommon.getTopic())
-            .put(RocketmqAttributes.CONSUMER_GROUP, consumerGroup)
-            .put(RocketmqAttributes.CLIENT_ID, messageMeter.getClient().getClientId()).build();
+        final Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
+            .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+            .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId()).build();
         messageDeliveryLatencyHistogram.record(latency, attributes);
     }
 
@@ -100,17 +97,15 @@ public class MetricMessageInterceptor implements MessageInterceptor {
             return;
         }
         final Duration durationAfterDecoding = optionalDurationAfterDecoding.get();
-        Attributes attributes = Attributes.builder().put(RocketmqAttributes.TOPIC, messageCommon.getTopic())
-            .put(RocketmqAttributes.CONSUMER_GROUP, consumerGroup)
-            .put(RocketmqAttributes.CLIENT_ID, messageMeter.getClient().getClientId()).build();
+        Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
+            .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+            .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId()).build();
         final DoubleHistogram histogram = messageMeter.getMessageAwaitTimeHistogram();
         histogram.record(durationAfterDecoding.toMillis(), attributes);
     }
 
     private void doAfterProcessMessage(List<MessageCommon> messageCommons, Duration duration,
         MessageHookPointsStatus status) {
-        final LongCounter processSuccessCounter = messageMeter.getProcessSuccessTotalCounter();
-        final LongCounter processFailureCounter = messageMeter.getProcessFailureTotalCounter();
         final DoubleHistogram processCostTimeHistogram = messageMeter.getProcessCostTimeHistogram();
         final ClientImpl client = messageMeter.getClient();
         if (!(client instanceof PushConsumer)) {
@@ -120,17 +115,14 @@ public class MetricMessageInterceptor implements MessageInterceptor {
         }
         PushConsumer pushConsumer = (PushConsumer) client;
         for (MessageCommon messageCommon : messageCommons) {
-            Attributes attributes = Attributes.builder().put(RocketmqAttributes.TOPIC, messageCommon.getTopic())
-                .put(RocketmqAttributes.CONSUMER_GROUP, pushConsumer.getConsumerGroup())
-                .put(RocketmqAttributes.CLIENT_ID, messageMeter.getClient().getClientId()).build();
-            if (MessageHookPointsStatus.OK.equals(status)) {
-                processSuccessCounter.add(1, attributes);
-                processCostTimeHistogram.record(duration.toMillis(), attributes);
-            }
-            if (MessageHookPointsStatus.ERROR.equals(status)) {
-                processFailureCounter.add(1, attributes);
-                processCostTimeHistogram.record(duration.toMillis(), attributes);
-            }
+            InvocationStatus invocationStatus = MessageHookPointsStatus.OK.equals(status) ? InvocationStatus.SUCCESS :
+                InvocationStatus.FAILURE;
+            Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
+                .put(MetricLabels.CONSUMER_GROUP, pushConsumer.getConsumerGroup())
+                .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId())
+                .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName())
+                .build();
+            processCostTimeHistogram.record(duration.toMillis(), attributes);
         }
     }
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
index a4e984f..caf2ef2 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
@@ -18,40 +18,41 @@
 package org.apache.rocketmq.client.java.metrics;
 
 public enum MetricName {
-    /**
-     * A counter that records the number of failed api calls of message publishing.
-     */
-    SEND_FAILURE_TOTAL("rocketmq_send_failure_total"),
     /**
      * A histogram that records the cost time of successful api calls of message publishing.
+     *
+     * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID}, {@link MetricLabels#INVOCATION_STATUS}.
      */
-    SEND_SUCCESS_COST_TIME("rocketmq_send_success_cost_time"),
-    /**
-     * A counter that records the number of successful consumption of message.
-     */
-    PROCESS_SUCCESS_TOTAL("rocketmq_process_success_total"),
-    /**
-     * A counter that records the number of failed consumption of message.
-     */
-    PROCESS_FAILURE_TOTAL("rocketmq_process_failure_total"),
+    SEND_SUCCESS_COST_TIME("rocketmq_send_cost_time"),
     /**
      * A gauge that records the cached message count of push consumer.
+     *
+     * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID}, {@link MetricLabels#CONSUMER_GROUP}.
      */
     CONSUMER_CACHED_MESSAGES("rocketmq_consumer_cached_messages"),
     /**
      * A gauge that records the cached message bytes of push consumer.
+     *
+     * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID}, {@link MetricLabels#CONSUMER_GROUP}.
      */
     CONSUMER_CACHED_BYTES("rocketmq_consumer_cached_bytes"),
     /**
      * A histogram that records the latency of message delivery from remote.
+     *
+     * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID}, {@link MetricLabels#CONSUMER_GROUP}.
      */
     DELIVERY_LATENCY("rocketmq_delivery_latency"),
     /**
      * A histogram that records await time of message consumption.
+     *
+     * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID}, {@link MetricLabels#CONSUMER_GROUP}.
      */
     AWAIT_TIME("rocketmq_await_time"),
     /**
-     * A counter that records the process time of message consumption.
+     * A histogram that records the process time of message consumption.
+     *
+     * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID}, {@link MetricLabels#CONSUMER_GROUP},
+     * {@link MetricLabels#INVOCATION_STATUS}.
      */
     PROCESS_TIME("rocketmq_process_time");