You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/12 13:14:08 UTC
[rocketmq-clients] 01/01: Java: refactor MessageMeter
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch java_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 0c74109b522c3dd2219458dc664da802d24e5fd7
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Tue Jul 12 21:11:48 2022 +0800
Java: refactor MessageMeter
---
.../rocketmq/client/java/impl/ClientImpl.java | 8 +-
.../java/impl/consumer/PushConsumerImpl.java | 2 +-
.../rocketmq/client/java/metrics/MessageMeter.java | 215 +++++----------------
.../client/java/metrics/MessageMeterProvider.java | 182 +++++++++++++++++
.../rocketmq/client/java/metrics/Metric.java | 11 +-
.../java/metrics/MetricMessageInterceptor.java | 51 +++--
6 files changed, 272 insertions(+), 197 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index e47bdcd..0c801d5 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -72,7 +72,7 @@ import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.message.MessageCommon;
-import org.apache.rocketmq.client.java.metrics.MessageMeter;
+import org.apache.rocketmq.client.java.metrics.MessageMeterProvider;
import org.apache.rocketmq.client.java.metrics.Metric;
import org.apache.rocketmq.client.java.misc.ExecutorServices;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
@@ -97,7 +97,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
// Thread-safe set.
protected final Set<Endpoints> isolated;
protected final ExecutorService clientCallbackExecutor;
- protected final MessageMeter messageMeter;
+ protected final MessageMeterProvider messageMeterProvider;
/**
* Telemetry command executor, which is aims to execute commands from remote.
*/
@@ -147,7 +147,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
new LinkedBlockingQueue<>(),
new ThreadFactoryImpl("ClientCallbackWorker"));
- this.messageMeter = new MessageMeter(this);
+ this.messageMeterProvider = new MessageMeterProvider(this);
this.telemetryCommandExecutor = new ThreadPoolExecutor(
1,
@@ -317,7 +317,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
*/
public final void onSettingsCommand(Endpoints endpoints, Settings settings) {
final Metric metric = new Metric(settings.getMetric());
- messageMeter.refresh(metric);
+ messageMeterProvider.reset(metric);
LOGGER.info("Receive settings from remote, endpoints={}", endpoints);
this.getClientSettings().applySettingsCommand(settings);
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 41648bd..9b34b56 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -153,7 +153,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
try {
LOGGER.info("Begin to start the rocketmq push consumer, clientId={}", clientId);
super.startUp();
- messageMeter.setMessageCacheObserver(this);
+ messageMeterProvider.setMessageCacheObserver(this);
final ScheduledExecutorService scheduler = clientManager.getScheduler();
this.consumeService = createConsumeService();
this.consumeService.startAsync().awaitRunning();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
index 2e7f287..a70f0d7 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
@@ -17,211 +17,88 @@
package org.apache.rocketmq.client.java.metrics;
-import io.grpc.ManagedChannel;
-import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
-import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
-import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
-import io.opentelemetry.api.common.Attributes;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.MoreObjects;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter;
-import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
-import io.opentelemetry.sdk.OpenTelemetrySdk;
-import io.opentelemetry.sdk.metrics.InstrumentSelector;
-import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
-import io.opentelemetry.sdk.metrics.View;
-import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
-import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
-import java.net.InetSocketAddress;
-import java.time.Duration;
-import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
-import javax.net.ssl.SSLException;
-import org.apache.rocketmq.client.apis.consumer.PushConsumer;
-import org.apache.rocketmq.client.java.impl.ClientImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
-import org.apache.rocketmq.client.java.rpc.AuthInterceptor;
-import org.apache.rocketmq.client.java.rpc.IpNameResolverFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageMeter {
+ static MessageMeter DISABLED = new MessageMeter();
private static final Logger LOGGER = LoggerFactory.getLogger(MessageMeter.class);
- private static final Duration METRIC_EXPORTER_RPC_TIMEOUT = Duration.ofSeconds(3);
- private static final Duration METRIC_READER_INTERVAL = Duration.ofMinutes(1);
- private static final String METRIC_INSTRUMENTATION_NAME = "org.apache.rocketmq.message";
-
- private final ClientImpl client;
-
- private volatile Meter meter;
- private volatile Endpoints metricEndpoints;
- private volatile SdkMeterProvider provider;
-
- private volatile MessageCacheObserver messageCacheObserver;
-
+ private final boolean enabled;
+ private final Meter meter;
+ private final Endpoints endpoints;
+ private final SdkMeterProvider provider;
private final ConcurrentMap<MetricName, DoubleHistogram> histogramMap;
- public MessageMeter(ClientImpl client) {
- this.client = client;
+ public MessageMeter(Meter meter, Endpoints endpoints, SdkMeterProvider provider) {
+ this.enabled = true;
+ this.meter = checkNotNull(meter, "meter should not be null");
+ this.endpoints = checkNotNull(endpoints, "endpoints should not be null");
+ this.provider = checkNotNull(provider, "provider should not be null");
this.histogramMap = new ConcurrentHashMap<>();
- this.client.registerMessageInterceptor(new MetricMessageInterceptor(this));
- this.messageCacheObserver = null;
}
- public void setMessageCacheObserver(MessageCacheObserver messageCacheObserver) {
- this.messageCacheObserver = messageCacheObserver;
+ private MessageMeter() {
+ this.enabled = false;
+ this.meter = null;
+ this.endpoints = null;
+ this.provider = null;
+ this.histogramMap = new ConcurrentHashMap<>();
}
- DoubleHistogram getHistogramByName(MetricName metricName) {
- return histogramMap.computeIfAbsent(metricName, name -> meter.histogramBuilder(name.getName()).build());
+ public boolean isEnabled() {
+ return enabled;
}
- public synchronized void refresh(Metric metric) {
- final String clientId = client.getClientId();
- try {
- if (!metric.isOn()) {
- LOGGER.info("Skip metric refresh because metric is off, clientId={}", clientId);
- shutdown();
- return;
- }
- final Optional<Endpoints> optionalEndpoints = metric.tryGetMetricEndpoints();
- if (!optionalEndpoints.isPresent()) {
- LOGGER.error("[Bug] Metric switch is on but endpoints is not filled, clientId={}",
- clientId);
- return;
- }
- final Endpoints existedEndpoints = metricEndpoints;
- final Endpoints newMetricEndpoints = optionalEndpoints.get();
- if (newMetricEndpoints.equals(metricEndpoints)) {
- LOGGER.debug("Message metric exporter endpoints remains the same, clientId={}, endpoints={}",
- clientId, newMetricEndpoints);
- return;
- }
- this.reset(newMetricEndpoints);
- LOGGER.info("Message meter endpoints is updated, clientId={}, {} => {}", clientId, existedEndpoints,
- newMetricEndpoints);
- } catch (Throwable t) {
- LOGGER.error("Exception raised while refreshing message meter, clientId={}", clientId, t);
- }
+ public Endpoints getEndpoints() {
+ return endpoints;
+ }
+
+ Optional<DoubleHistogram> getHistogramByName(MetricName metricName) {
+ final DoubleHistogram histogram = histogramMap.computeIfAbsent(metricName, name -> enabled ?
+ meter.histogramBuilder(name.getName()).build() : null);
+ return null == histogram ? Optional.empty() : Optional.of(histogram);
}
- public synchronized void shutdown() {
- if (null == provider) {
+ public void shutdown() {
+ if (!enabled) {
return;
}
- final String clientId = client.getClientId();
- LOGGER.info("Begin to shutdown the message meter, clientId={}", clientId);
final CountDownLatch latch = new CountDownLatch(1);
provider.shutdown().whenComplete(latch::countDown);
try {
latch.await();
} catch (Throwable t) {
- LOGGER.error("Exception raised while waiting for the shutdown of meter, clientId={}", clientId);
+ LOGGER.error("Failed to shutdown message meter, endpoints={}", endpoints, t);
}
- LOGGER.info("Shutdown the message meter, clientId={}", clientId);
- // Clear endpoints.
- metricEndpoints = null;
- // Clear meter.
- meter = null;
- // Clear provider.
- provider = null;
- }
-
- public Meter getMeter() {
- return meter;
}
- public ClientImpl getClient() {
- return client;
- }
-
- @SuppressWarnings("deprecation")
- private void reset(Endpoints newMetricEndpoints) throws SSLException {
- final String clientId = client.getClientId();
- final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
- .build();
- final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(newMetricEndpoints.getGrpcTarget())
- .sslContext(sslContext).intercept(new AuthInterceptor(client.getClientConfiguration(), clientId));
- final List<InetSocketAddress> socketAddresses = newMetricEndpoints.toSocketAddresses();
- if (null != socketAddresses) {
- IpNameResolverFactory metricResolverFactory = new IpNameResolverFactory(socketAddresses);
- channelBuilder.nameResolverFactory(metricResolverFactory);
+ public boolean satisfy(Metric metric) {
+ if (enabled && metric.isOn() && endpoints.equals(metric.getEndpoints())) {
+ return true;
}
- ManagedChannel channel = channelBuilder.build();
-
- OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.builder().setChannel(channel)
- .setTimeout(METRIC_EXPORTER_RPC_TIMEOUT)
- .setAggregationTemporalitySelector(AggregationTemporalitySelector.deltaPreferred())
- .build();
-
- InstrumentSelector sendSuccessCostTimeInstrumentSelector = InstrumentSelector.builder()
- .setType(InstrumentType.HISTOGRAM).setName(MetricName.SEND_SUCCESS_COST_TIME.getName()).build();
- final View sendSuccessCostTimeView = View.builder()
- .setAggregation(HistogramBuckets.SEND_SUCCESS_COST_TIME_BUCKET).build();
-
- InstrumentSelector deliveryLatencyInstrumentSelector = InstrumentSelector.builder()
- .setType(InstrumentType.HISTOGRAM).setName(MetricName.DELIVERY_LATENCY.getName()).build();
- final View deliveryLatencyView = View.builder().setAggregation(HistogramBuckets.DELIVERY_LATENCY_BUCKET)
- .build();
-
- InstrumentSelector awaitTimeInstrumentSelector = InstrumentSelector.builder()
- .setType(InstrumentType.HISTOGRAM).setName(MetricName.AWAIT_TIME.getName()).build();
- final View awaitTimeView = View.builder().setAggregation(HistogramBuckets.AWAIT_TIME_BUCKET).build();
-
- InstrumentSelector processTimeInstrumentSelector = InstrumentSelector.builder()
- .setType(InstrumentType.HISTOGRAM).setName(MetricName.PROCESS_TIME.getName()).build();
- final View processTimeView = View.builder().setAggregation(HistogramBuckets.PROCESS_TIME_BUCKET).build();
-
- PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter)
- .setInterval(METRIC_READER_INTERVAL).build();
-
- final SdkMeterProvider newProvider = SdkMeterProvider.builder().registerMetricReader(reader)
- .registerView(sendSuccessCostTimeInstrumentSelector, sendSuccessCostTimeView)
- .registerView(deliveryLatencyInstrumentSelector, deliveryLatencyView)
- .registerView(awaitTimeInstrumentSelector, awaitTimeView)
- .registerView(processTimeInstrumentSelector, processTimeView)
- .build();
+ return !enabled && !metric.isOn();
+ }
- final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(newProvider).build();
- meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
- shutdown();
- // Force clean existed histogram.
- histogramMap.clear();
- final ClientImpl client = this.getClient();
- if (!(client instanceof PushConsumer)) {
- // No need for producer and simple consumer.
- return;
- }
- final String consumerGroup = ((PushConsumer) client).getConsumerGroup();
- meter.gaugeBuilder(MetricName.CONSUMER_CACHED_MESSAGES.getName()).buildWithCallback(measurement -> {
- final Map<String, Long> cachedMessageCountMap = messageCacheObserver.getCachedMessageCount();
- for (Map.Entry<String, Long> entry : cachedMessageCountMap.entrySet()) {
- final String topic = entry.getKey();
- Attributes attributes = Attributes.builder()
- .put(MetricLabels.TOPIC, topic)
- .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
- .put(MetricLabels.CLIENT_ID, clientId).build();
- measurement.record(entry.getValue(), attributes);
- }
- });
- meter.gaugeBuilder(MetricName.CONSUMER_CACHED_BYTES.getName()).buildWithCallback(measurement -> {
- final Map<String, Long> cachedMessageBytesMap = messageCacheObserver.getCachedMessageBytes();
- for (Map.Entry<String, Long> entry : cachedMessageBytesMap.entrySet()) {
- final String topic = entry.getKey();
- Attributes attributes = Attributes.builder()
- .put(MetricLabels.TOPIC, topic)
- .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
- .put(MetricLabels.CLIENT_ID, clientId).build();
- measurement.record(entry.getValue(), attributes);
- }
- });
- this.provider = newProvider;
- this.metricEndpoints = newMetricEndpoints;
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("enabled", enabled)
+ .add("meter", meter)
+ .add("metricEndpoints", endpoints)
+ .add("provider", provider)
+ .add("histogramMap", histogramMap)
+ .toString();
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterProvider.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterProvider.java
new file mode 100644
index 0000000..6a1b548
--- /dev/null
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterProvider.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.metrics;
+
+import io.grpc.ManagedChannel;
+import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
+import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.apache.rocketmq.client.java.impl.ClientImpl;
+import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.rpc.AuthInterceptor;
+import org.apache.rocketmq.client.java.rpc.IpNameResolverFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageMeterProvider {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MessageMeterProvider.class);
+
+ private static final Duration METRIC_EXPORTER_RPC_TIMEOUT = Duration.ofSeconds(3);
+ private static final Duration METRIC_READER_INTERVAL = Duration.ofSeconds(1);
+ private static final String METRIC_INSTRUMENTATION_NAME = "org.apache.rocketmq.message";
+
+ private final ClientImpl client;
+
+ private volatile MessageMeter messageMeter;
+
+ private volatile MessageCacheObserver messageCacheObserver;
+
+ public MessageMeterProvider(ClientImpl client) {
+ this.client = client;
+ this.client.registerMessageInterceptor(new MetricMessageInterceptor(this));
+ this.messageMeter = MessageMeter.DISABLED;
+ this.messageCacheObserver = null;
+ }
+
+ public void setMessageCacheObserver(MessageCacheObserver messageCacheObserver) {
+ this.messageCacheObserver = messageCacheObserver;
+ }
+
+ Optional<DoubleHistogram> getHistogramByName(MetricName metricName) {
+ return messageMeter.getHistogramByName(metricName);
+ }
+
+ public synchronized void reset(Metric metric) {
+ final String clientId = client.getClientId();
+ try {
+ if (messageMeter.satisfy(metric)) {
+ LOGGER.debug("Metric settings is satisfied by the current message meter, clientId={}", clientId);
+ return;
+ }
+ if (!metric.isOn()) {
+ LOGGER.debug("Metric is off, clientId={}", clientId);
+ messageMeter.shutdown();
+ messageMeter = MessageMeter.DISABLED;
+ return;
+ }
+ final Endpoints endpoints = metric.getEndpoints();
+ final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .build();
+ final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(endpoints.getGrpcTarget())
+ .sslContext(sslContext).intercept(new AuthInterceptor(client.getClientConfiguration(), clientId));
+ final List<InetSocketAddress> socketAddresses = endpoints.toSocketAddresses();
+ if (null != socketAddresses) {
+ IpNameResolverFactory metricResolverFactory = new IpNameResolverFactory(socketAddresses);
+ channelBuilder.nameResolverFactory(metricResolverFactory);
+ }
+ ManagedChannel channel = channelBuilder.build();
+ OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.builder().setChannel(channel)
+ .setTimeout(METRIC_EXPORTER_RPC_TIMEOUT)
+ .setAggregationTemporalitySelector(AggregationTemporalitySelector.deltaPreferred())
+ .build();
+
+ InstrumentSelector sendSuccessCostTimeInstrumentSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM).setName(MetricName.SEND_SUCCESS_COST_TIME.getName()).build();
+ final View sendSuccessCostTimeView = View.builder()
+ .setAggregation(HistogramBuckets.SEND_SUCCESS_COST_TIME_BUCKET).build();
+
+ InstrumentSelector deliveryLatencyInstrumentSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM).setName(MetricName.DELIVERY_LATENCY.getName()).build();
+ final View deliveryLatencyView = View.builder().setAggregation(HistogramBuckets.DELIVERY_LATENCY_BUCKET)
+ .build();
+
+ InstrumentSelector awaitTimeInstrumentSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM).setName(MetricName.AWAIT_TIME.getName()).build();
+ final View awaitTimeView = View.builder().setAggregation(HistogramBuckets.AWAIT_TIME_BUCKET).build();
+
+ InstrumentSelector processTimeInstrumentSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM).setName(MetricName.PROCESS_TIME.getName()).build();
+ final View processTimeView = View.builder().setAggregation(HistogramBuckets.PROCESS_TIME_BUCKET).build();
+
+ PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter)
+ .setInterval(METRIC_READER_INTERVAL).build();
+
+ final SdkMeterProvider provider = SdkMeterProvider.builder().registerMetricReader(reader)
+ .registerView(sendSuccessCostTimeInstrumentSelector, sendSuccessCostTimeView)
+ .registerView(deliveryLatencyInstrumentSelector, deliveryLatencyView)
+ .registerView(awaitTimeInstrumentSelector, awaitTimeView)
+ .registerView(processTimeInstrumentSelector, processTimeView)
+ .build();
+
+ final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(provider).build();
+ Meter meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
+
+ // Reset message meter.
+ MessageMeter existedMessageMeter = messageMeter;
+ messageMeter = new MessageMeter(meter, endpoints, provider);
+ existedMessageMeter.shutdown();
+ LOGGER.info("Metrics is on, endpoints={}, clientId={}", endpoints, clientId);
+
+ if (!(client instanceof PushConsumer)) {
+ // No need for producer and simple consumer.
+ return;
+ }
+ final String consumerGroup = ((PushConsumer) client).getConsumerGroup();
+ meter.gaugeBuilder(MetricName.CONSUMER_CACHED_MESSAGES.getName()).buildWithCallback(measurement -> {
+ final Map<String, Long> cachedMessageCountMap = messageCacheObserver.getCachedMessageCount();
+ for (Map.Entry<String, Long> entry : cachedMessageCountMap.entrySet()) {
+ final String topic = entry.getKey();
+ Attributes attributes = Attributes.builder()
+ .put(MetricLabels.TOPIC, topic)
+ .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+ .put(MetricLabels.CLIENT_ID, clientId).build();
+ measurement.record(entry.getValue(), attributes);
+ }
+ });
+ meter.gaugeBuilder(MetricName.CONSUMER_CACHED_BYTES.getName()).buildWithCallback(measurement -> {
+ final Map<String, Long> cachedMessageBytesMap = messageCacheObserver.getCachedMessageBytes();
+ for (Map.Entry<String, Long> entry : cachedMessageBytesMap.entrySet()) {
+ final String topic = entry.getKey();
+ Attributes attributes = Attributes.builder()
+ .put(MetricLabels.TOPIC, topic)
+ .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
+ .put(MetricLabels.CLIENT_ID, clientId).build();
+ measurement.record(entry.getValue(), attributes);
+ }
+ });
+ } catch (Throwable t) {
+ LOGGER.error("Exception raised when resetting message meter, clientId={}", clientId, t);
+ }
+ }
+
+ public boolean isEnabled() {
+ return messageMeter.isEnabled();
+ }
+
+ public ClientImpl getClient() {
+ return client;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java
index 7099a04..0cb7224 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java
@@ -17,20 +17,19 @@
package org.apache.rocketmq.client.java.metrics;
-import java.util.Optional;
import org.apache.rocketmq.client.java.route.Endpoints;
public class Metric {
- private final Endpoints metricEndpoints;
+ private final Endpoints endpoints;
private final boolean on;
public Metric(apache.rocketmq.v2.Metric metric) {
- this.metricEndpoints = metric.hasEndpoints() ? new Endpoints(metric.getEndpoints()) : null;
- this.on = metric.getOn();
+ this.endpoints = metric.hasEndpoints() ? new Endpoints(metric.getEndpoints()) : null;
+ this.on = metric.getOn() && metric.hasEndpoints();
}
- public Optional<Endpoints> tryGetMetricEndpoints() {
- return null == metricEndpoints ? Optional.empty() : Optional.of(metricEndpoints);
+ public Endpoints getEndpoints() {
+ return endpoints;
}
public boolean isOn() {
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
index 2de9c53..a798455 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
@@ -21,7 +21,6 @@ import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
-import io.opentelemetry.api.metrics.Meter;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
@@ -38,20 +37,25 @@ import org.slf4j.LoggerFactory;
public class MetricMessageInterceptor implements MessageInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(MetricMessageInterceptor.class);
- private final MessageMeter messageMeter;
+ private final MessageMeterProvider messageMeterProvider;
- public MetricMessageInterceptor(MessageMeter messageMeter) {
- this.messageMeter = messageMeter;
+ public MetricMessageInterceptor(MessageMeterProvider messageMeterProvider) {
+ this.messageMeterProvider = messageMeterProvider;
}
private void doAfterSendMessage(List<MessageCommon> messageCommons, Duration duration,
MessageHookPointsStatus status) {
- final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.SEND_SUCCESS_COST_TIME);
+ final Optional<DoubleHistogram> optionalHistogram =
+ messageMeterProvider.getHistogramByName(MetricName.SEND_SUCCESS_COST_TIME);
+ if (!optionalHistogram.isPresent()) {
+ return;
+ }
+ final DoubleHistogram histogram = optionalHistogram.get();
for (MessageCommon messageCommon : messageCommons) {
InvocationStatus invocationStatus = MessageHookPointsStatus.OK.equals(status) ? InvocationStatus.SUCCESS :
InvocationStatus.FAILURE;
Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
- .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId())
+ .put(MetricLabels.CLIENT_ID, messageMeterProvider.getClient().getClientId())
.put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName()).build();
histogram.record(duration.toMillis(), attributes);
}
@@ -61,7 +65,7 @@ public class MetricMessageInterceptor implements MessageInterceptor {
if (messageCommons.isEmpty()) {
return;
}
- final ClientImpl client = messageMeter.getClient();
+ final ClientImpl client = messageMeterProvider.getClient();
String consumerGroup = null;
if (client instanceof PushConsumer) {
consumerGroup = ((PushConsumer) client).getConsumerGroup();
@@ -80,7 +84,12 @@ public class MetricMessageInterceptor implements MessageInterceptor {
}
final Timestamp deliveryTimestampFromRemote = optionalDeliveryTimestampFromRemote.get();
final long latency = System.currentTimeMillis() - Timestamps.toMillis(deliveryTimestampFromRemote);
- final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.DELIVERY_LATENCY);
+ final Optional<DoubleHistogram> optionalHistogram =
+ messageMeterProvider.getHistogramByName(MetricName.DELIVERY_LATENCY);
+ if (!optionalHistogram.isPresent()) {
+ return;
+ }
+ final DoubleHistogram histogram = optionalHistogram.get();
final Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
.put(MetricLabels.CONSUMER_GROUP, consumerGroup)
.put(MetricLabels.CLIENT_ID, client.getClientId()).build();
@@ -88,7 +97,7 @@ public class MetricMessageInterceptor implements MessageInterceptor {
}
private void doBeforeConsumeMessage(List<MessageCommon> messageCommons) {
- final ClientImpl client = messageMeter.getClient();
+ final ClientImpl client = messageMeterProvider.getClient();
String consumerGroup = null;
if (client instanceof PushConsumer) {
consumerGroup = ((PushConsumer) client).getConsumerGroup();
@@ -106,14 +115,18 @@ public class MetricMessageInterceptor implements MessageInterceptor {
Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
.put(MetricLabels.CONSUMER_GROUP, consumerGroup)
.put(MetricLabels.CLIENT_ID, client.getClientId()).build();
- final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.AWAIT_TIME);
+ final Optional<DoubleHistogram> optionalHistogram =
+ messageMeterProvider.getHistogramByName(MetricName.AWAIT_TIME);
+ if (!optionalHistogram.isPresent()) {
+ return;
+ }
+ final DoubleHistogram histogram = optionalHistogram.get();
histogram.record(durationAfterDecoding.toMillis(), attributes);
}
private void doAfterProcessMessage(List<MessageCommon> messageCommons, Duration duration,
MessageHookPointsStatus status) {
- final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.PROCESS_TIME);
- final ClientImpl client = messageMeter.getClient();
+ final ClientImpl client = messageMeterProvider.getClient();
if (!(client instanceof PushConsumer)) {
// Should never reach here.
LOGGER.error("[Bug] current client is not push consumer, clientId={}", client.getClientId());
@@ -125,17 +138,22 @@ public class MetricMessageInterceptor implements MessageInterceptor {
InvocationStatus.FAILURE;
Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
.put(MetricLabels.CONSUMER_GROUP, pushConsumer.getConsumerGroup())
- .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId())
+ .put(MetricLabels.CLIENT_ID, messageMeterProvider.getClient().getClientId())
.put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName())
.build();
+ final Optional<DoubleHistogram> optionalHistogram =
+ messageMeterProvider.getHistogramByName(MetricName.PROCESS_TIME);
+ if (!optionalHistogram.isPresent()) {
+ return;
+ }
+ final DoubleHistogram histogram = optionalHistogram.get();
histogram.record(duration.toMillis(), attributes);
}
}
@Override
public void doBefore(MessageHookPoints messageHookPoints, List<MessageCommon> messageCommons) {
- final Meter meter = messageMeter.getMeter();
- if (null == meter) {
+ if (!messageMeterProvider.isEnabled()) {
return;
}
if (MessageHookPoints.CONSUME.equals(messageHookPoints)) {
@@ -146,8 +164,7 @@ public class MetricMessageInterceptor implements MessageInterceptor {
@Override
public void doAfter(MessageHookPoints messageHookPoints, List<MessageCommon> messageCommons, Duration duration,
MessageHookPointsStatus status) {
- final Meter meter = messageMeter.getMeter();
- if (null == meter) {
+ if (!messageMeterProvider.isEnabled()) {
return;
}
switch (messageHookPoints) {