You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/08 08:48:42 UTC

[rocketmq-clients] branch master updated: Java: polish the part of metrics (#35)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new ba3da92  Java: polish the part of metrics (#35)
ba3da92 is described below

commit ba3da922781f4531c1cc354e24280245bab19109
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Fri Jul 8 16:48:34 2022 +0800

    Java: polish the part of metrics (#35)
---
 .../rocketmq/client/java/metrics/MessageMeter.java | 169 ++++++++++-----------
 .../java/metrics/MetricMessageInterceptor.java     |  17 +--
 2 files changed, 92 insertions(+), 94 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
index c4e6bda..77a7948 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
@@ -40,6 +40,7 @@ import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import javax.net.ssl.SSLException;
 import org.apache.rocketmq.client.apis.consumer.PushConsumer;
 import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
 import org.apache.rocketmq.client.java.impl.ClientImpl;
@@ -53,7 +54,7 @@ public class MessageMeter {
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageMeter.class);
 
     private static final Duration METRIC_EXPORTER_RPC_TIMEOUT = Duration.ofSeconds(3);
-
+    private static final Duration METRIC_READER_INTERVAL = Duration.ofMinutes(1);
     private static final String METRIC_INSTRUMENTATION_NAME = "org.apache.rocketmq.message";
 
     private final ClientImpl client;
@@ -77,27 +78,10 @@ public class MessageMeter {
         this.messageCacheObserver = messageCacheObserver;
     }
 
-    DoubleHistogram getSendSuccessCostTimeHistogram() {
-        return histogramMap.computeIfAbsent(MetricName.SEND_SUCCESS_COST_TIME,
-            name -> meter.histogramBuilder(name.getName()).build());
-    }
-
-    DoubleHistogram getProcessCostTimeHistogram() {
-        return histogramMap.computeIfAbsent(MetricName.PROCESS_TIME,
-            name -> meter.histogramBuilder(name.getName()).build());
-    }
-
-    DoubleHistogram getMessageDeliveryLatencyHistogram() {
-        return histogramMap.computeIfAbsent(MetricName.DELIVERY_LATENCY,
-            name -> meter.histogramBuilder(name.getName()).build());
+    DoubleHistogram getHistogramByName(MetricName metricName) {
+        return histogramMap.computeIfAbsent(metricName, name -> meter.histogramBuilder(name.getName()).build());
     }
 
-    DoubleHistogram getMessageAwaitTimeHistogram() {
-        return histogramMap.computeIfAbsent(MetricName.AWAIT_TIME,
-            name -> meter.histogramBuilder(name.getName()).build());
-    }
-
-    @SuppressWarnings("deprecation")
     public synchronized void refresh(Metric metric) {
         final String clientId = client.getClientId();
         try {
@@ -112,60 +96,16 @@ public class MessageMeter {
                     clientId);
                 return;
             }
+            final Endpoints existedEndpoints = metricEndpoints;
             final Endpoints newMetricEndpoints = optionalEndpoints.get();
             if (newMetricEndpoints.equals(metricEndpoints)) {
                 LOGGER.debug("Message metric exporter endpoints remains the same, clientId={}, endpoints={}",
                     clientId, newMetricEndpoints);
                 return;
             }
-            final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
-                .build();
-            final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(newMetricEndpoints.getGrpcTarget())
-                .sslContext(sslContext)
-                .intercept(new AuthInterceptor(client.getClientConfiguration(), clientId));
-            final List<InetSocketAddress> socketAddresses = newMetricEndpoints.toSocketAddresses();
-            if (null != socketAddresses) {
-                IpNameResolverFactory metricResolverFactory = new IpNameResolverFactory(socketAddresses);
-                channelBuilder.nameResolverFactory(metricResolverFactory);
-            }
-            ManagedChannel channel = channelBuilder.build();
-
-            OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.builder().setChannel(channel)
-                .setTimeout(METRIC_EXPORTER_RPC_TIMEOUT).build();
-
-            InstrumentSelector sendSuccessCostTimeInstrumentSelector = InstrumentSelector.builder()
-                .setType(InstrumentType.HISTOGRAM).setName(MetricName.SEND_SUCCESS_COST_TIME.getName()).build();
-            final View sendSuccessCostTimeView = View.builder()
-                .setAggregation(HistogramBuckets.SEND_SUCCESS_COST_TIME_BUCKET).build();
-
-            InstrumentSelector deliveryLatencyInstrumentSelector = InstrumentSelector.builder()
-                .setType(InstrumentType.HISTOGRAM).setName(MetricName.DELIVERY_LATENCY.getName()).build();
-            final View deliveryLatencyView = View.builder().setAggregation(HistogramBuckets.DELIVERY_LATENCY_BUCKET)
-                .build();
-
-            InstrumentSelector awaitTimeInstrumentSelector = InstrumentSelector.builder()
-                .setType(InstrumentType.HISTOGRAM).setName(MetricName.AWAIT_TIME.getName()).build();
-            final View awaitTimeView = View.builder().setAggregation(HistogramBuckets.AWAIT_TIME_BUCKET).build();
-
-            InstrumentSelector processTimeInstrumentSelector = InstrumentSelector.builder()
-                .setType(InstrumentType.HISTOGRAM).setName(MetricName.PROCESS_TIME.getName()).build();
-            final View processTimeView = View.builder().setAggregation(HistogramBuckets.PROCESS_TIME_BUCKET).build();
-
-            PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter)
-                .setInterval(Duration.ofSeconds(1)).build();
-            provider = SdkMeterProvider.builder().registerMetricReader(reader)
-                .registerView(sendSuccessCostTimeInstrumentSelector, sendSuccessCostTimeView)
-                .registerView(deliveryLatencyInstrumentSelector, deliveryLatencyView)
-                .registerView(awaitTimeInstrumentSelector, awaitTimeView)
-                .registerView(processTimeInstrumentSelector, processTimeView)
-                .build();
-
-            final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(provider).build();
-            meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
-            LOGGER.info("Message meter exporter is updated, clientId={}, {} => {}", clientId,
-                metricEndpoints, newMetricEndpoints);
-            this.reset();
-            metricEndpoints = newMetricEndpoints;
+            this.reset(newMetricEndpoints);
+            LOGGER.info("Message meter endpoints is updated, clientId={}, {} => {}", clientId, existedEndpoints,
+                newMetricEndpoints);
         } catch (Throwable t) {
             LOGGER.error("Exception raised while refreshing message meter, clientId={}", clientId, t);
         }
@@ -182,19 +122,26 @@ public class MessageMeter {
         return Optional.empty();
     }
 
-    public void shutdown() {
-        if (null != provider) {
-            LOGGER.info("Begin to shutdown the message meter, clientId={}", client.getClientId());
-            final CountDownLatch latch = new CountDownLatch(1);
-            provider.shutdown().whenComplete(latch::countDown);
-            try {
-                latch.await();
-            } catch (Throwable t) {
-                LOGGER.error("Exception raised while waiting for the shutdown of meter, clientId={}",
-                    client.getClientId());
-            }
-            LOGGER.info("Shutdown the message meter, clientId={}", client.getClientId());
+    public synchronized void shutdown() {
+        if (null == provider) {
+            return;
+        }
+        final String clientId = client.getClientId();
+        LOGGER.info("Begin to shutdown the message meter, clientId={}", clientId);
+        final CountDownLatch latch = new CountDownLatch(1);
+        provider.shutdown().whenComplete(latch::countDown);
+        try {
+            latch.await();
+        } catch (Throwable t) {
+            LOGGER.error("Exception raised while waiting for the shutdown of meter, clientId={}", clientId);
         }
+        LOGGER.info("Shutdown the message meter, clientId={}", clientId);
+        // Clear endpoints.
+        metricEndpoints = null;
+        // Clear meter.
+        meter = null;
+        // Clear provider.
+        provider = null;
     }
 
     public Meter getMeter() {
@@ -205,7 +152,55 @@ public class MessageMeter {
         return client;
     }
 
-    private void reset() {
+    @SuppressWarnings("deprecation")
+    private void reset(Endpoints newMetricEndpoints) throws SSLException {
+        final String clientId = client.getClientId();
+        final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
+            .build();
+        final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(newMetricEndpoints.getGrpcTarget())
+            .sslContext(sslContext).intercept(new AuthInterceptor(client.getClientConfiguration(), clientId));
+        final List<InetSocketAddress> socketAddresses = newMetricEndpoints.toSocketAddresses();
+        if (null != socketAddresses) {
+            IpNameResolverFactory metricResolverFactory = new IpNameResolverFactory(socketAddresses);
+            channelBuilder.nameResolverFactory(metricResolverFactory);
+        }
+        ManagedChannel channel = channelBuilder.build();
+
+        OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.builder().setChannel(channel)
+            .setTimeout(METRIC_EXPORTER_RPC_TIMEOUT).build();
+
+        InstrumentSelector sendSuccessCostTimeInstrumentSelector = InstrumentSelector.builder()
+            .setType(InstrumentType.HISTOGRAM).setName(MetricName.SEND_SUCCESS_COST_TIME.getName()).build();
+        final View sendSuccessCostTimeView = View.builder()
+            .setAggregation(HistogramBuckets.SEND_SUCCESS_COST_TIME_BUCKET).build();
+
+        InstrumentSelector deliveryLatencyInstrumentSelector = InstrumentSelector.builder()
+            .setType(InstrumentType.HISTOGRAM).setName(MetricName.DELIVERY_LATENCY.getName()).build();
+        final View deliveryLatencyView = View.builder().setAggregation(HistogramBuckets.DELIVERY_LATENCY_BUCKET)
+            .build();
+
+        InstrumentSelector awaitTimeInstrumentSelector = InstrumentSelector.builder()
+            .setType(InstrumentType.HISTOGRAM).setName(MetricName.AWAIT_TIME.getName()).build();
+        final View awaitTimeView = View.builder().setAggregation(HistogramBuckets.AWAIT_TIME_BUCKET).build();
+
+        InstrumentSelector processTimeInstrumentSelector = InstrumentSelector.builder()
+            .setType(InstrumentType.HISTOGRAM).setName(MetricName.PROCESS_TIME.getName()).build();
+        final View processTimeView = View.builder().setAggregation(HistogramBuckets.PROCESS_TIME_BUCKET).build();
+
+        PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter)
+            .setInterval(METRIC_READER_INTERVAL).build();
+
+        final SdkMeterProvider newProvider = SdkMeterProvider.builder().registerMetricReader(reader)
+            .registerView(sendSuccessCostTimeInstrumentSelector, sendSuccessCostTimeView)
+            .registerView(deliveryLatencyInstrumentSelector, deliveryLatencyView)
+            .registerView(awaitTimeInstrumentSelector, awaitTimeView)
+            .registerView(processTimeInstrumentSelector, processTimeView)
+            .build();
+
+        final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(newProvider).build();
+        meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
+        shutdown();
+        // Force clean existed histogram.
         histogramMap.clear();
         meter.gaugeBuilder(MetricName.CONSUMER_CACHED_MESSAGES.getName()).buildWithCallback(measurement -> {
             final Optional<String> optionalConsumerGroup = tryGetConsumerGroup();
@@ -217,9 +212,10 @@ public class MessageMeter {
             final Map<String, Long> cachedMessageCountMap = messageCacheObserver.getCachedMessageCount();
             for (Map.Entry<String, Long> entry : cachedMessageCountMap.entrySet()) {
                 final String topic = entry.getKey();
-                Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, topic)
+                Attributes attributes = Attributes.builder()
+                    .put(MetricLabels.TOPIC, topic)
                     .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
-                    .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
+                    .put(MetricLabels.CLIENT_ID, clientId).build();
                 measurement.record(entry.getValue(), attributes);
             }
         });
@@ -233,11 +229,14 @@ public class MessageMeter {
             final Map<String, Long> cachedMessageBytesMap = messageCacheObserver.getCachedMessageBytes();
             for (Map.Entry<String, Long> entry : cachedMessageBytesMap.entrySet()) {
                 final String topic = entry.getKey();
-                Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, topic)
+                Attributes attributes = Attributes.builder()
+                    .put(MetricLabels.TOPIC, topic)
                     .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
-                    .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
+                    .put(MetricLabels.CLIENT_ID, clientId).build();
                 measurement.record(entry.getValue(), attributes);
             }
         });
+        this.provider = newProvider;
+        this.metricEndpoints = newMetricEndpoints;
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
index 62fbe55..6f079fc 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
@@ -45,15 +45,14 @@ public class MetricMessageInterceptor implements MessageInterceptor {
 
     private void doAfterSendMessage(List<MessageCommon> messageCommons, Duration duration,
         MessageHookPointsStatus status) {
-        final DoubleHistogram costTimeHistogram = messageMeter.getSendSuccessCostTimeHistogram();
+        final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.SEND_SUCCESS_COST_TIME);
         for (MessageCommon messageCommon : messageCommons) {
             InvocationStatus invocationStatus = MessageHookPointsStatus.OK.equals(status) ? InvocationStatus.SUCCESS :
                 InvocationStatus.FAILURE;
             Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
                 .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId())
-                .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName())
-                .build();
-            costTimeHistogram.record(duration.toMillis(), attributes);
+                .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName()).build();
+            histogram.record(duration.toMillis(), attributes);
         }
     }
 
@@ -77,11 +76,11 @@ public class MetricMessageInterceptor implements MessageInterceptor {
         }
         final Timestamp deliveryTimestampFromRemote = optionalDeliveryTimestampFromRemote.get();
         final long latency = System.currentTimeMillis() - Timestamps.toMillis(deliveryTimestampFromRemote);
-        final DoubleHistogram messageDeliveryLatencyHistogram = messageMeter.getMessageDeliveryLatencyHistogram();
+        final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.DELIVERY_LATENCY);
         final Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
             .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
             .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId()).build();
-        messageDeliveryLatencyHistogram.record(latency, attributes);
+        histogram.record(latency, attributes);
     }
 
     private void doBeforeConsumeMessage(List<MessageCommon> messageCommons) {
@@ -100,13 +99,13 @@ public class MetricMessageInterceptor implements MessageInterceptor {
         Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
             .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
             .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId()).build();
-        final DoubleHistogram histogram = messageMeter.getMessageAwaitTimeHistogram();
+        final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.AWAIT_TIME);
         histogram.record(durationAfterDecoding.toMillis(), attributes);
     }
 
     private void doAfterProcessMessage(List<MessageCommon> messageCommons, Duration duration,
         MessageHookPointsStatus status) {
-        final DoubleHistogram processCostTimeHistogram = messageMeter.getProcessCostTimeHistogram();
+        final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.PROCESS_TIME);
         final ClientImpl client = messageMeter.getClient();
         if (!(client instanceof PushConsumer)) {
             // Should never reach here.
@@ -122,7 +121,7 @@ public class MetricMessageInterceptor implements MessageInterceptor {
                 .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId())
                 .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName())
                 .build();
-            processCostTimeHistogram.record(duration.toMillis(), attributes);
+            histogram.record(duration.toMillis(), attributes);
         }
     }