You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/12 13:29:01 UTC

[rocketmq-clients] branch master updated: Java: refactor MessageMeter (#42)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 890dcb4  Java: refactor MessageMeter (#42)
890dcb4 is described below

commit 890dcb498944196fb08f4f335380185104169ad1
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Tue Jul 12 21:28:56 2022 +0800

    Java: refactor MessageMeter (#42)
---
 .../rocketmq/client/java/impl/ClientImpl.java      |   8 +-
 .../java/impl/consumer/PushConsumerImpl.java       |   2 +-
 .../rocketmq/client/java/metrics/MessageMeter.java | 215 +++++----------------
 .../client/java/metrics/MessageMeterProvider.java  | 182 +++++++++++++++++
 .../rocketmq/client/java/metrics/Metric.java       |  11 +-
 .../java/metrics/MetricMessageInterceptor.java     |  51 +++--
 6 files changed, 272 insertions(+), 197 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index e47bdcd..0c801d5 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -72,7 +72,7 @@ import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.message.MessageCommon;
-import org.apache.rocketmq.client.java.metrics.MessageMeter;
+import org.apache.rocketmq.client.java.metrics.MessageMeterProvider;
 import org.apache.rocketmq.client.java.metrics.Metric;
 import org.apache.rocketmq.client.java.misc.ExecutorServices;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
@@ -97,7 +97,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     // Thread-safe set.
     protected final Set<Endpoints> isolated;
     protected final ExecutorService clientCallbackExecutor;
-    protected final MessageMeter messageMeter;
+    protected final MessageMeterProvider messageMeterProvider;
     /**
      * Telemetry command executor, which is aims to execute commands from remote.
      */
@@ -147,7 +147,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
             new LinkedBlockingQueue<>(),
             new ThreadFactoryImpl("ClientCallbackWorker"));
 
-        this.messageMeter = new MessageMeter(this);
+        this.messageMeterProvider = new MessageMeterProvider(this);
 
         this.telemetryCommandExecutor = new ThreadPoolExecutor(
             1,
@@ -317,7 +317,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
      */
     public final void onSettingsCommand(Endpoints endpoints, Settings settings) {
         final Metric metric = new Metric(settings.getMetric());
-        messageMeter.refresh(metric);
+        messageMeterProvider.reset(metric);
         LOGGER.info("Receive settings from remote, endpoints={}", endpoints);
         this.getClientSettings().applySettingsCommand(settings);
     }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 41648bd..9b34b56 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -153,7 +153,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
         try {
             LOGGER.info("Begin to start the rocketmq push consumer, clientId={}", clientId);
             super.startUp();
-            messageMeter.setMessageCacheObserver(this);
+            messageMeterProvider.setMessageCacheObserver(this);
             final ScheduledExecutorService scheduler = clientManager.getScheduler();
             this.consumeService = createConsumeService();
             this.consumeService.startAsync().awaitRunning();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
index 2e7f287..a70f0d7 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
@@ -17,211 +17,88 @@
 
 package org.apache.rocketmq.client.java.metrics;
 
-import io.grpc.ManagedChannel;
-import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
-import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
-import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
-import io.opentelemetry.api.common.Attributes;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.MoreObjects;
 import io.opentelemetry.api.metrics.DoubleHistogram;
 import io.opentelemetry.api.metrics.Meter;
-import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
-import io.opentelemetry.sdk.OpenTelemetrySdk;
-import io.opentelemetry.sdk.metrics.InstrumentSelector;
-import io.opentelemetry.sdk.metrics.InstrumentType;
 import io.opentelemetry.sdk.metrics.SdkMeterProvider;
-import io.opentelemetry.sdk.metrics.View;
-import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
-import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
-import java.net.InetSocketAddress;
-import java.time.Duration;
-import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
-import javax.net.ssl.SSLException;
-import org.apache.rocketmq.client.apis.consumer.PushConsumer;
-import org.apache.rocketmq.client.java.impl.ClientImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
-import org.apache.rocketmq.client.java.rpc.AuthInterceptor;
-import org.apache.rocketmq.client.java.rpc.IpNameResolverFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MessageMeter {
+    static MessageMeter DISABLED = new MessageMeter();
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageMeter.class);
 
-    private static final Duration METRIC_EXPORTER_RPC_TIMEOUT = Duration.ofSeconds(3);
-    private static final Duration METRIC_READER_INTERVAL = Duration.ofMinutes(1);
-    private static final String METRIC_INSTRUMENTATION_NAME = "org.apache.rocketmq.message";
-
-    private final ClientImpl client;
-
-    private volatile Meter meter;
-    private volatile Endpoints metricEndpoints;
-    private volatile SdkMeterProvider provider;
-
-    private volatile MessageCacheObserver messageCacheObserver;
-
+    private final boolean enabled;
+    private final Meter meter;
+    private final Endpoints endpoints;
+    private final SdkMeterProvider provider;
     private final ConcurrentMap<MetricName, DoubleHistogram> histogramMap;
 
-    public MessageMeter(ClientImpl client) {
-        this.client = client;
+    public MessageMeter(Meter meter, Endpoints endpoints, SdkMeterProvider provider) {
+        this.enabled = true;
+        this.meter = checkNotNull(meter, "meter should not be null");
+        this.endpoints = checkNotNull(endpoints, "endpoints should not be null");
+        this.provider = checkNotNull(provider, "provider should not be null");
         this.histogramMap = new ConcurrentHashMap<>();
-        this.client.registerMessageInterceptor(new MetricMessageInterceptor(this));
-        this.messageCacheObserver = null;
     }
 
-    public void setMessageCacheObserver(MessageCacheObserver messageCacheObserver) {
-        this.messageCacheObserver = messageCacheObserver;
+    private MessageMeter() {
+        this.enabled = false;
+        this.meter = null;
+        this.endpoints = null;
+        this.provider = null;
+        this.histogramMap = new ConcurrentHashMap<>();
     }
 
-    DoubleHistogram getHistogramByName(MetricName metricName) {
-        return histogramMap.computeIfAbsent(metricName, name -> meter.histogramBuilder(name.getName()).build());
+    public boolean isEnabled() {
+        return enabled;
     }
 
-    public synchronized void refresh(Metric metric) {
-        final String clientId = client.getClientId();
-        try {
-            if (!metric.isOn()) {
-                LOGGER.info("Skip metric refresh because metric is off, clientId={}", clientId);
-                shutdown();
-                return;
-            }
-            final Optional<Endpoints> optionalEndpoints = metric.tryGetMetricEndpoints();
-            if (!optionalEndpoints.isPresent()) {
-                LOGGER.error("[Bug] Metric switch is on but endpoints is not filled, clientId={}",
-                    clientId);
-                return;
-            }
-            final Endpoints existedEndpoints = metricEndpoints;
-            final Endpoints newMetricEndpoints = optionalEndpoints.get();
-            if (newMetricEndpoints.equals(metricEndpoints)) {
-                LOGGER.debug("Message metric exporter endpoints remains the same, clientId={}, endpoints={}",
-                    clientId, newMetricEndpoints);
-                return;
-            }
-            this.reset(newMetricEndpoints);
-            LOGGER.info("Message meter endpoints is updated, clientId={}, {} => {}", clientId, existedEndpoints,
-                newMetricEndpoints);
-        } catch (Throwable t) {
-            LOGGER.error("Exception raised while refreshing message meter, clientId={}", clientId, t);
-        }
+    public Endpoints getEndpoints() {
+        return endpoints;
+    }
+
+    Optional<DoubleHistogram> getHistogramByName(MetricName metricName) {
+        final DoubleHistogram histogram = histogramMap.computeIfAbsent(metricName, name -> enabled ?
+            meter.histogramBuilder(name.getName()).build() : null);
+        return null == histogram ? Optional.empty() : Optional.of(histogram);
     }
 
-    public synchronized void shutdown() {
-        if (null == provider) {
+    public void shutdown() {
+        if (!enabled) {
             return;
         }
-        final String clientId = client.getClientId();
-        LOGGER.info("Begin to shutdown the message meter, clientId={}", clientId);
         final CountDownLatch latch = new CountDownLatch(1);
         provider.shutdown().whenComplete(latch::countDown);
         try {
             latch.await();
         } catch (Throwable t) {
-            LOGGER.error("Exception raised while waiting for the shutdown of meter, clientId={}", clientId);
+            LOGGER.error("Failed to shutdown message meter, endpoints={}", endpoints, t);
         }
-        LOGGER.info("Shutdown the message meter, clientId={}", clientId);
-        // Clear endpoints.
-        metricEndpoints = null;
-        // Clear meter.
-        meter = null;
-        // Clear provider.
-        provider = null;
-    }
-
-    public Meter getMeter() {
-        return meter;
     }
 
-    public ClientImpl getClient() {
-        return client;
-    }
-
-    @SuppressWarnings("deprecation")
-    private void reset(Endpoints newMetricEndpoints) throws SSLException {
-        final String clientId = client.getClientId();
-        final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
-            .build();
-        final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(newMetricEndpoints.getGrpcTarget())
-            .sslContext(sslContext).intercept(new AuthInterceptor(client.getClientConfiguration(), clientId));
-        final List<InetSocketAddress> socketAddresses = newMetricEndpoints.toSocketAddresses();
-        if (null != socketAddresses) {
-            IpNameResolverFactory metricResolverFactory = new IpNameResolverFactory(socketAddresses);
-            channelBuilder.nameResolverFactory(metricResolverFactory);
+    public boolean satisfy(Metric metric) {
+        if (enabled && metric.isOn() && endpoints.equals(metric.getEndpoints())) {
+            return true;
         }
-        ManagedChannel channel = channelBuilder.build();
-
-        OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.builder().setChannel(channel)
-            .setTimeout(METRIC_EXPORTER_RPC_TIMEOUT)
-            .setAggregationTemporalitySelector(AggregationTemporalitySelector.deltaPreferred())
-            .build();
-
-        InstrumentSelector sendSuccessCostTimeInstrumentSelector = InstrumentSelector.builder()
-            .setType(InstrumentType.HISTOGRAM).setName(MetricName.SEND_SUCCESS_COST_TIME.getName()).build();
-        final View sendSuccessCostTimeView = View.builder()
-            .setAggregation(HistogramBuckets.SEND_SUCCESS_COST_TIME_BUCKET).build();
-
-        InstrumentSelector deliveryLatencyInstrumentSelector = InstrumentSelector.builder()
-            .setType(InstrumentType.HISTOGRAM).setName(MetricName.DELIVERY_LATENCY.getName()).build();
-        final View deliveryLatencyView = View.builder().setAggregation(HistogramBuckets.DELIVERY_LATENCY_BUCKET)
-            .build();
-
-        InstrumentSelector awaitTimeInstrumentSelector = InstrumentSelector.builder()
-            .setType(InstrumentType.HISTOGRAM).setName(MetricName.AWAIT_TIME.getName()).build();
-        final View awaitTimeView = View.builder().setAggregation(HistogramBuckets.AWAIT_TIME_BUCKET).build();
-
-        InstrumentSelector processTimeInstrumentSelector = InstrumentSelector.builder()
-            .setType(InstrumentType.HISTOGRAM).setName(MetricName.PROCESS_TIME.getName()).build();
-        final View processTimeView = View.builder().setAggregation(HistogramBuckets.PROCESS_TIME_BUCKET).build();
-
-        PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter)
-            .setInterval(METRIC_READER_INTERVAL).build();
-
-        final SdkMeterProvider newProvider = SdkMeterProvider.builder().registerMetricReader(reader)
-            .registerView(sendSuccessCostTimeInstrumentSelector, sendSuccessCostTimeView)
-            .registerView(deliveryLatencyInstrumentSelector, deliveryLatencyView)
-            .registerView(awaitTimeInstrumentSelector, awaitTimeView)
-            .registerView(processTimeInstrumentSelector, processTimeView)
-            .build();
+        return !enabled && !metric.isOn();
+    }
 
-        final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(newProvider).build();
-        meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
-        shutdown();
-        // Force clean existed histogram.
-        histogramMap.clear();
-        final ClientImpl client = this.getClient();
-        if (!(client instanceof PushConsumer)) {
-            // No need for producer and simple consumer.
-            return;
-        }
-        final String consumerGroup = ((PushConsumer) client).getConsumerGroup();
-        meter.gaugeBuilder(MetricName.CONSUMER_CACHED_MESSAGES.getName()).buildWithCallback(measurement -> {
-            final Map<String, Long> cachedMessageCountMap = messageCacheObserver.getCachedMessageCount();
-            for (Map.Entry<String, Long> entry : cachedMessageCountMap.entrySet()) {
-                final String topic = entry.getKey();
-                Attributes attributes = Attributes.builder()
-                    .put(MetricLabels.TOPIC, topic)
-                    .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
-                    .put(MetricLabels.CLIENT_ID, clientId).build();
-                measurement.record(entry.getValue(), attributes);
-            }
-        });
-        meter.gaugeBuilder(MetricName.CONSUMER_CACHED_BYTES.getName()).buildWithCallback(measurement -> {
-            final Map<String, Long> cachedMessageBytesMap = messageCacheObserver.getCachedMessageBytes();
-            for (Map.Entry<String, Long> entry : cachedMessageBytesMap.entrySet()) {
-                final String topic = entry.getKey();
-                Attributes attributes = Attributes.builder()
-                    .put(MetricLabels.TOPIC, topic)
-                    .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
-                    .put(MetricLabels.CLIENT_ID, clientId).build();
-                measurement.record(entry.getValue(), attributes);
-            }
-        });
-        this.provider = newProvider;
-        this.metricEndpoints = newMetricEndpoints;
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("enabled", enabled)
+            .add("meter", meter)
+            .add("metricEndpoints", endpoints)
+            .add("provider", provider)
+            .add("histogramMap", histogramMap)
+            .toString();
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterProvider.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterProvider.java
new file mode 100644
index 0000000..6a1b548
--- /dev/null
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterProvider.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.metrics;
+
+import io.grpc.ManagedChannel;
+import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
+import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.apache.rocketmq.client.java.impl.ClientImpl;
+import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.rpc.AuthInterceptor;
+import org.apache.rocketmq.client.java.rpc.IpNameResolverFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageMeterProvider {
+    private static final Logger LOGGER = LoggerFactory.getLogger(MessageMeterProvider.class);
+
+    private static final Duration METRIC_EXPORTER_RPC_TIMEOUT = Duration.ofSeconds(3);
+    private static final Duration METRIC_READER_INTERVAL = Duration.ofSeconds(1);
+    private static final String METRIC_INSTRUMENTATION_NAME = "org.apache.rocketmq.message";
+
+    private final ClientImpl client;
+
+    private volatile MessageMeter messageMeter;
+
+    private volatile MessageCacheObserver messageCacheObserver;
+
+    public MessageMeterProvider(ClientImpl client) {
+        this.client = client;
+        this.client.registerMessageInterceptor(new MetricMessageInterceptor(this));
+        this.messageMeter = MessageMeter.DISABLED;
+        this.messageCacheObserver = null;
+    }
+
+    public void setMessageCacheObserver(MessageCacheObserver messageCacheObserver) {
+        this.messageCacheObserver = messageCacheObserver;
+    }
+
+    Optional<DoubleHistogram> getHistogramByName(MetricName metricName) {
+        return messageMeter.getHistogramByName(metricName);
+    }
+
+    public synchronized void reset(Metric metric) {
+        final String clientId = client.getClientId();
+        try {
+            if (messageMeter.satisfy(metric)) {
+                LOGGER.debug("Metric settings is satisfied by the current message meter, clientId={}", clientId);
+                return;
+            }
+            if (!metric.isOn()) {
+                LOGGER.debug("Metric is off, clientId={}", clientId);
+                messageMeter.shutdown();
+                messageMeter = MessageMeter.DISABLED;
+                return;
+            }
+            final Endpoints endpoints = metric.getEndpoints();
+            final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
+                .build();
+            final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(endpoints.getGrpcTarget())
+                .sslContext(sslContext).intercept(new AuthInterceptor(client.getClientConfiguration(), clientId));
+            final List<InetSocketAddress> socketAddresses = endpoints.toSocketAddresses();
+            if (null != socketAddresses) {
+                IpNameResolverFactory metricResolverFactory = new IpNameResolverFactory(socketAddresses);
+                channelBuilder.nameResolverFactory(metricResolverFactory);
+            }
+            ManagedChannel channel = channelBuilder.build();
+            OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.builder().setChannel(channel)
+                .setTimeout(METRIC_EXPORTER_RPC_TIMEOUT)
+                .setAggregationTemporalitySelector(AggregationTemporalitySelector.deltaPreferred())
+                .build();
+
+            InstrumentSelector sendSuccessCostTimeInstrumentSelector = InstrumentSelector.builder()
+                .setType(InstrumentType.HISTOGRAM).setName(MetricName.SEND_SUCCESS_COST_TIME.getName()).build();
+            final View sendSuccessCostTimeView = View.builder()
+                .setAggregation(HistogramBuckets.SEND_SUCCESS_COST_TIME_BUCKET).build();
+
+            InstrumentSelector deliveryLatencyInstrumentSelector = InstrumentSelector.builder()
+                .setType(InstrumentType.HISTOGRAM).setName(MetricName.DELIVERY_LATENCY.getName()).build();
+            final View deliveryLatencyView = View.builder().setAggregation(HistogramBuckets.DELIVERY_LATENCY_BUCKET)
+                .build();
+
+            InstrumentSelector awaitTimeInstrumentSelector = InstrumentSelector.builder()
+                .setType(InstrumentType.HISTOGRAM).setName(MetricName.AWAIT_TIME.getName()).build();
+            final View awaitTimeView = View.builder().setAggregation(HistogramBuckets.AWAIT_TIME_BUCKET).build();
+
+            InstrumentSelector processTimeInstrumentSelector = InstrumentSelector.builder()
+                .setType(InstrumentType.HISTOGRAM).setName(MetricName.PROCESS_TIME.getName()).build();
+            final View processTimeView = View.builder().setAggregation(HistogramBuckets.PROCESS_TIME_BUCKET).build();
+
+            PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter)
+                .setInterval(METRIC_READER_INTERVAL).build();
+
+            final SdkMeterProvider provider = SdkMeterProvider.builder().registerMetricReader(reader)
+                .registerView(sendSuccessCostTimeInstrumentSelector, sendSuccessCostTimeView)
+                .registerView(deliveryLatencyInstrumentSelector, deliveryLatencyView)
+                .registerView(awaitTimeInstrumentSelector, awaitTimeView)
+                .registerView(processTimeInstrumentSelector, processTimeView)
+                .build();
+
+            final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(provider).build();
+            Meter meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
+
+            // Reset message meter.
+            MessageMeter existedMessageMeter = messageMeter;
+            messageMeter = new MessageMeter(meter, endpoints, provider);
+            existedMessageMeter.shutdown();
+            LOGGER.info("Metrics is on, endpoints={}, clientId={}", endpoints, clientId);
+
+            if (!(client instanceof PushConsumer)) {
+                // No need for producer and simple consumer.
+                return;
+            }
+            final String consumerGroup = ((PushConsumer) client).getConsumerGroup();
+            meter.gaugeBuilder(MetricName.CONSUMER_CACHED_MESSAGES.getName()).buildWithCallback(measurement -> {
+                final Map<String, Long> cachedMessageCountMap = messageCacheObserver.getCachedMessageCount();
+                for (Map.Entry<String, Long> entry : cachedMessageCountMap.entrySet()) {
+                    final String topic = entry.getKey();
+                    Attributes attributes = Attributes.builder()
+                        .put(MetricLabels.TOPIC, topic)
+                        .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+                        .put(MetricLabels.CLIENT_ID, clientId).build();
+                    measurement.record(entry.getValue(), attributes);
+                }
+            });
+            meter.gaugeBuilder(MetricName.CONSUMER_CACHED_BYTES.getName()).buildWithCallback(measurement -> {
+                final Map<String, Long> cachedMessageBytesMap = messageCacheObserver.getCachedMessageBytes();
+                for (Map.Entry<String, Long> entry : cachedMessageBytesMap.entrySet()) {
+                    final String topic = entry.getKey();
+                    Attributes attributes = Attributes.builder()
+                        .put(MetricLabels.TOPIC, topic)
+                        .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+                        .put(MetricLabels.CLIENT_ID, clientId).build();
+                    measurement.record(entry.getValue(), attributes);
+                }
+            });
+        } catch (Throwable t) {
+            LOGGER.error("Exception raised when resetting message meter, clientId={}", clientId, t);
+        }
+    }
+
+    public boolean isEnabled() {
+        return messageMeter.isEnabled();
+    }
+
+    public ClientImpl getClient() {
+        return client;
+    }
+}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java
index 7099a04..0cb7224 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java
@@ -17,20 +17,19 @@
 
 package org.apache.rocketmq.client.java.metrics;
 
-import java.util.Optional;
 import org.apache.rocketmq.client.java.route.Endpoints;
 
 public class Metric {
-    private final Endpoints metricEndpoints;
+    private final Endpoints endpoints;
     private final boolean on;
 
     public Metric(apache.rocketmq.v2.Metric metric) {
-        this.metricEndpoints = metric.hasEndpoints() ? new Endpoints(metric.getEndpoints()) : null;
-        this.on = metric.getOn();
+        this.endpoints = metric.hasEndpoints() ? new Endpoints(metric.getEndpoints()) : null;
+        this.on = metric.getOn() && metric.hasEndpoints();
     }
 
-    public Optional<Endpoints> tryGetMetricEndpoints() {
-        return null == metricEndpoints ? Optional.empty() : Optional.of(metricEndpoints);
+    public Endpoints getEndpoints() {
+        return endpoints;
     }
 
     public boolean isOn() {
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
index 2de9c53..a798455 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
@@ -21,7 +21,6 @@ import com.google.protobuf.Timestamp;
 import com.google.protobuf.util.Timestamps;
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.metrics.DoubleHistogram;
-import io.opentelemetry.api.metrics.Meter;
 import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
@@ -38,20 +37,25 @@ import org.slf4j.LoggerFactory;
 public class MetricMessageInterceptor implements MessageInterceptor {
     private static final Logger LOGGER = LoggerFactory.getLogger(MetricMessageInterceptor.class);
 
-    private final MessageMeter messageMeter;
+    private final MessageMeterProvider messageMeterProvider;
 
-    public MetricMessageInterceptor(MessageMeter messageMeter) {
-        this.messageMeter = messageMeter;
+    public MetricMessageInterceptor(MessageMeterProvider messageMeterProvider) {
+        this.messageMeterProvider = messageMeterProvider;
     }
 
     private void doAfterSendMessage(List<MessageCommon> messageCommons, Duration duration,
         MessageHookPointsStatus status) {
-        final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.SEND_SUCCESS_COST_TIME);
+        final Optional<DoubleHistogram> optionalHistogram =
+            messageMeterProvider.getHistogramByName(MetricName.SEND_SUCCESS_COST_TIME);
+        if (!optionalHistogram.isPresent()) {
+            return;
+        }
+        final DoubleHistogram histogram = optionalHistogram.get();
         for (MessageCommon messageCommon : messageCommons) {
             InvocationStatus invocationStatus = MessageHookPointsStatus.OK.equals(status) ? InvocationStatus.SUCCESS :
                 InvocationStatus.FAILURE;
             Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
-                .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId())
+                .put(MetricLabels.CLIENT_ID, messageMeterProvider.getClient().getClientId())
                 .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName()).build();
             histogram.record(duration.toMillis(), attributes);
         }
@@ -61,7 +65,7 @@ public class MetricMessageInterceptor implements MessageInterceptor {
         if (messageCommons.isEmpty()) {
             return;
         }
-        final ClientImpl client = messageMeter.getClient();
+        final ClientImpl client = messageMeterProvider.getClient();
         String consumerGroup = null;
         if (client instanceof PushConsumer) {
             consumerGroup = ((PushConsumer) client).getConsumerGroup();
@@ -80,7 +84,12 @@ public class MetricMessageInterceptor implements MessageInterceptor {
         }
         final Timestamp deliveryTimestampFromRemote = optionalDeliveryTimestampFromRemote.get();
         final long latency = System.currentTimeMillis() - Timestamps.toMillis(deliveryTimestampFromRemote);
-        final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.DELIVERY_LATENCY);
+        final Optional<DoubleHistogram> optionalHistogram =
+            messageMeterProvider.getHistogramByName(MetricName.DELIVERY_LATENCY);
+        if (!optionalHistogram.isPresent()) {
+            return;
+        }
+        final DoubleHistogram histogram = optionalHistogram.get();
         final Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
             .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
             .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
@@ -88,7 +97,7 @@ public class MetricMessageInterceptor implements MessageInterceptor {
     }
 
     private void doBeforeConsumeMessage(List<MessageCommon> messageCommons) {
-        final ClientImpl client = messageMeter.getClient();
+        final ClientImpl client = messageMeterProvider.getClient();
         String consumerGroup = null;
         if (client instanceof PushConsumer) {
             consumerGroup = ((PushConsumer) client).getConsumerGroup();
@@ -106,14 +115,18 @@ public class MetricMessageInterceptor implements MessageInterceptor {
         Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
             .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
             .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
-        final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.AWAIT_TIME);
+        final Optional<DoubleHistogram> optionalHistogram =
+            messageMeterProvider.getHistogramByName(MetricName.AWAIT_TIME);
+        if (!optionalHistogram.isPresent()) {
+            return;
+        }
+        final DoubleHistogram histogram = optionalHistogram.get();
         histogram.record(durationAfterDecoding.toMillis(), attributes);
     }
 
     private void doAfterProcessMessage(List<MessageCommon> messageCommons, Duration duration,
         MessageHookPointsStatus status) {
-        final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.PROCESS_TIME);
-        final ClientImpl client = messageMeter.getClient();
+        final ClientImpl client = messageMeterProvider.getClient();
         if (!(client instanceof PushConsumer)) {
             // Should never reach here.
             LOGGER.error("[Bug] current client is not push consumer, clientId={}", client.getClientId());
@@ -125,17 +138,22 @@ public class MetricMessageInterceptor implements MessageInterceptor {
                 InvocationStatus.FAILURE;
             Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
                 .put(MetricLabels.CONSUMER_GROUP, pushConsumer.getConsumerGroup())
-                .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId())
+                .put(MetricLabels.CLIENT_ID, messageMeterProvider.getClient().getClientId())
                 .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName())
                 .build();
+            final Optional<DoubleHistogram> optionalHistogram =
+                messageMeterProvider.getHistogramByName(MetricName.PROCESS_TIME);
+            if (!optionalHistogram.isPresent()) {
+                return;
+            }
+            final DoubleHistogram histogram = optionalHistogram.get();
             histogram.record(duration.toMillis(), attributes);
         }
     }
 
     @Override
     public void doBefore(MessageHookPoints messageHookPoints, List<MessageCommon> messageCommons) {
-        final Meter meter = messageMeter.getMeter();
-        if (null == meter) {
+        if (!messageMeterProvider.isEnabled()) {
             return;
         }
         if (MessageHookPoints.CONSUME.equals(messageHookPoints)) {
@@ -146,8 +164,7 @@ public class MetricMessageInterceptor implements MessageInterceptor {
     @Override
     public void doAfter(MessageHookPoints messageHookPoints, List<MessageCommon> messageCommons, Duration duration,
         MessageHookPointsStatus status) {
-        final Meter meter = messageMeter.getMeter();
-        if (null == meter) {
+        if (!messageMeterProvider.isEnabled()) {
             return;
         }
         switch (messageHookPoints) {