You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/08 08:48:42 UTC
[rocketmq-clients] branch master updated: Java: polish the part of metrics (#35)
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new ba3da92 Java: polish the part of metrics (#35)
ba3da92 is described below
commit ba3da922781f4531c1cc354e24280245bab19109
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Fri Jul 8 16:48:34 2022 +0800
Java: polish the part of metrics (#35)
---
.../rocketmq/client/java/metrics/MessageMeter.java | 169 ++++++++++-----------
.../java/metrics/MetricMessageInterceptor.java | 17 +--
2 files changed, 92 insertions(+), 94 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
index c4e6bda..77a7948 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
@@ -40,6 +40,7 @@ import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import javax.net.ssl.SSLException;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.java.impl.ClientImpl;
@@ -53,7 +54,7 @@ public class MessageMeter {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageMeter.class);
private static final Duration METRIC_EXPORTER_RPC_TIMEOUT = Duration.ofSeconds(3);
-
+ private static final Duration METRIC_READER_INTERVAL = Duration.ofMinutes(1);
private static final String METRIC_INSTRUMENTATION_NAME = "org.apache.rocketmq.message";
private final ClientImpl client;
@@ -77,27 +78,10 @@ public class MessageMeter {
this.messageCacheObserver = messageCacheObserver;
}
- DoubleHistogram getSendSuccessCostTimeHistogram() {
- return histogramMap.computeIfAbsent(MetricName.SEND_SUCCESS_COST_TIME,
- name -> meter.histogramBuilder(name.getName()).build());
- }
-
- DoubleHistogram getProcessCostTimeHistogram() {
- return histogramMap.computeIfAbsent(MetricName.PROCESS_TIME,
- name -> meter.histogramBuilder(name.getName()).build());
- }
-
- DoubleHistogram getMessageDeliveryLatencyHistogram() {
- return histogramMap.computeIfAbsent(MetricName.DELIVERY_LATENCY,
- name -> meter.histogramBuilder(name.getName()).build());
+ DoubleHistogram getHistogramByName(MetricName metricName) {
+ return histogramMap.computeIfAbsent(metricName, name -> meter.histogramBuilder(name.getName()).build());
}
- DoubleHistogram getMessageAwaitTimeHistogram() {
- return histogramMap.computeIfAbsent(MetricName.AWAIT_TIME,
- name -> meter.histogramBuilder(name.getName()).build());
- }
-
- @SuppressWarnings("deprecation")
public synchronized void refresh(Metric metric) {
final String clientId = client.getClientId();
try {
@@ -112,60 +96,16 @@ public class MessageMeter {
clientId);
return;
}
+ final Endpoints existedEndpoints = metricEndpoints;
final Endpoints newMetricEndpoints = optionalEndpoints.get();
if (newMetricEndpoints.equals(metricEndpoints)) {
LOGGER.debug("Message metric exporter endpoints remains the same, clientId={}, endpoints={}",
clientId, newMetricEndpoints);
return;
}
- final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
- .build();
- final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(newMetricEndpoints.getGrpcTarget())
- .sslContext(sslContext)
- .intercept(new AuthInterceptor(client.getClientConfiguration(), clientId));
- final List<InetSocketAddress> socketAddresses = newMetricEndpoints.toSocketAddresses();
- if (null != socketAddresses) {
- IpNameResolverFactory metricResolverFactory = new IpNameResolverFactory(socketAddresses);
- channelBuilder.nameResolverFactory(metricResolverFactory);
- }
- ManagedChannel channel = channelBuilder.build();
-
- OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.builder().setChannel(channel)
- .setTimeout(METRIC_EXPORTER_RPC_TIMEOUT).build();
-
- InstrumentSelector sendSuccessCostTimeInstrumentSelector = InstrumentSelector.builder()
- .setType(InstrumentType.HISTOGRAM).setName(MetricName.SEND_SUCCESS_COST_TIME.getName()).build();
- final View sendSuccessCostTimeView = View.builder()
- .setAggregation(HistogramBuckets.SEND_SUCCESS_COST_TIME_BUCKET).build();
-
- InstrumentSelector deliveryLatencyInstrumentSelector = InstrumentSelector.builder()
- .setType(InstrumentType.HISTOGRAM).setName(MetricName.DELIVERY_LATENCY.getName()).build();
- final View deliveryLatencyView = View.builder().setAggregation(HistogramBuckets.DELIVERY_LATENCY_BUCKET)
- .build();
-
- InstrumentSelector awaitTimeInstrumentSelector = InstrumentSelector.builder()
- .setType(InstrumentType.HISTOGRAM).setName(MetricName.AWAIT_TIME.getName()).build();
- final View awaitTimeView = View.builder().setAggregation(HistogramBuckets.AWAIT_TIME_BUCKET).build();
-
- InstrumentSelector processTimeInstrumentSelector = InstrumentSelector.builder()
- .setType(InstrumentType.HISTOGRAM).setName(MetricName.PROCESS_TIME.getName()).build();
- final View processTimeView = View.builder().setAggregation(HistogramBuckets.PROCESS_TIME_BUCKET).build();
-
- PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter)
- .setInterval(Duration.ofSeconds(1)).build();
- provider = SdkMeterProvider.builder().registerMetricReader(reader)
- .registerView(sendSuccessCostTimeInstrumentSelector, sendSuccessCostTimeView)
- .registerView(deliveryLatencyInstrumentSelector, deliveryLatencyView)
- .registerView(awaitTimeInstrumentSelector, awaitTimeView)
- .registerView(processTimeInstrumentSelector, processTimeView)
- .build();
-
- final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(provider).build();
- meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
- LOGGER.info("Message meter exporter is updated, clientId={}, {} => {}", clientId,
- metricEndpoints, newMetricEndpoints);
- this.reset();
- metricEndpoints = newMetricEndpoints;
+ this.reset(newMetricEndpoints);
+ LOGGER.info("Message meter endpoints is updated, clientId={}, {} => {}", clientId, existedEndpoints,
+ newMetricEndpoints);
} catch (Throwable t) {
LOGGER.error("Exception raised while refreshing message meter, clientId={}", clientId, t);
}
@@ -182,19 +122,26 @@ public class MessageMeter {
return Optional.empty();
}
- public void shutdown() {
- if (null != provider) {
- LOGGER.info("Begin to shutdown the message meter, clientId={}", client.getClientId());
- final CountDownLatch latch = new CountDownLatch(1);
- provider.shutdown().whenComplete(latch::countDown);
- try {
- latch.await();
- } catch (Throwable t) {
- LOGGER.error("Exception raised while waiting for the shutdown of meter, clientId={}",
- client.getClientId());
- }
- LOGGER.info("Shutdown the message meter, clientId={}", client.getClientId());
+ public synchronized void shutdown() {
+ if (null == provider) {
+ return;
+ }
+ final String clientId = client.getClientId();
+ LOGGER.info("Begin to shutdown the message meter, clientId={}", clientId);
+ final CountDownLatch latch = new CountDownLatch(1);
+ provider.shutdown().whenComplete(latch::countDown);
+ try {
+ latch.await();
+ } catch (Throwable t) {
+ LOGGER.error("Exception raised while waiting for the shutdown of meter, clientId={}", clientId);
}
+ LOGGER.info("Shutdown the message meter, clientId={}", clientId);
+ // Clear endpoints.
+ metricEndpoints = null;
+ // Clear meter.
+ meter = null;
+ // Clear provider.
+ provider = null;
}
public Meter getMeter() {
@@ -205,7 +152,55 @@ public class MessageMeter {
return client;
}
- private void reset() {
+ @SuppressWarnings("deprecation")
+ private void reset(Endpoints newMetricEndpoints) throws SSLException {
+ final String clientId = client.getClientId();
+ final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .build();
+ final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(newMetricEndpoints.getGrpcTarget())
+ .sslContext(sslContext).intercept(new AuthInterceptor(client.getClientConfiguration(), clientId));
+ final List<InetSocketAddress> socketAddresses = newMetricEndpoints.toSocketAddresses();
+ if (null != socketAddresses) {
+ IpNameResolverFactory metricResolverFactory = new IpNameResolverFactory(socketAddresses);
+ channelBuilder.nameResolverFactory(metricResolverFactory);
+ }
+ ManagedChannel channel = channelBuilder.build();
+
+ OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.builder().setChannel(channel)
+ .setTimeout(METRIC_EXPORTER_RPC_TIMEOUT).build();
+
+ InstrumentSelector sendSuccessCostTimeInstrumentSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM).setName(MetricName.SEND_SUCCESS_COST_TIME.getName()).build();
+ final View sendSuccessCostTimeView = View.builder()
+ .setAggregation(HistogramBuckets.SEND_SUCCESS_COST_TIME_BUCKET).build();
+
+ InstrumentSelector deliveryLatencyInstrumentSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM).setName(MetricName.DELIVERY_LATENCY.getName()).build();
+ final View deliveryLatencyView = View.builder().setAggregation(HistogramBuckets.DELIVERY_LATENCY_BUCKET)
+ .build();
+
+ InstrumentSelector awaitTimeInstrumentSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM).setName(MetricName.AWAIT_TIME.getName()).build();
+ final View awaitTimeView = View.builder().setAggregation(HistogramBuckets.AWAIT_TIME_BUCKET).build();
+
+ InstrumentSelector processTimeInstrumentSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM).setName(MetricName.PROCESS_TIME.getName()).build();
+ final View processTimeView = View.builder().setAggregation(HistogramBuckets.PROCESS_TIME_BUCKET).build();
+
+ PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter)
+ .setInterval(METRIC_READER_INTERVAL).build();
+
+ final SdkMeterProvider newProvider = SdkMeterProvider.builder().registerMetricReader(reader)
+ .registerView(sendSuccessCostTimeInstrumentSelector, sendSuccessCostTimeView)
+ .registerView(deliveryLatencyInstrumentSelector, deliveryLatencyView)
+ .registerView(awaitTimeInstrumentSelector, awaitTimeView)
+ .registerView(processTimeInstrumentSelector, processTimeView)
+ .build();
+
+ final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(newProvider).build();
+ meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
+ shutdown();
+ // Force clean existed histogram.
histogramMap.clear();
meter.gaugeBuilder(MetricName.CONSUMER_CACHED_MESSAGES.getName()).buildWithCallback(measurement -> {
final Optional<String> optionalConsumerGroup = tryGetConsumerGroup();
@@ -217,9 +212,10 @@ public class MessageMeter {
final Map<String, Long> cachedMessageCountMap = messageCacheObserver.getCachedMessageCount();
for (Map.Entry<String, Long> entry : cachedMessageCountMap.entrySet()) {
final String topic = entry.getKey();
- Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, topic)
+ Attributes attributes = Attributes.builder()
+ .put(MetricLabels.TOPIC, topic)
.put(MetricLabels.CONSUMER_GROUP, consumerGroup)
- .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
+ .put(MetricLabels.CLIENT_ID, clientId).build();
measurement.record(entry.getValue(), attributes);
}
});
@@ -233,11 +229,14 @@ public class MessageMeter {
final Map<String, Long> cachedMessageBytesMap = messageCacheObserver.getCachedMessageBytes();
for (Map.Entry<String, Long> entry : cachedMessageBytesMap.entrySet()) {
final String topic = entry.getKey();
- Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, topic)
+ Attributes attributes = Attributes.builder()
+ .put(MetricLabels.TOPIC, topic)
.put(MetricLabels.CONSUMER_GROUP, consumerGroup)
- .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
+ .put(MetricLabels.CLIENT_ID, clientId).build();
measurement.record(entry.getValue(), attributes);
}
});
+ this.provider = newProvider;
+ this.metricEndpoints = newMetricEndpoints;
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
index 62fbe55..6f079fc 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
@@ -45,15 +45,14 @@ public class MetricMessageInterceptor implements MessageInterceptor {
private void doAfterSendMessage(List<MessageCommon> messageCommons, Duration duration,
MessageHookPointsStatus status) {
- final DoubleHistogram costTimeHistogram = messageMeter.getSendSuccessCostTimeHistogram();
+ final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.SEND_SUCCESS_COST_TIME);
for (MessageCommon messageCommon : messageCommons) {
InvocationStatus invocationStatus = MessageHookPointsStatus.OK.equals(status) ? InvocationStatus.SUCCESS :
InvocationStatus.FAILURE;
Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
.put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId())
- .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName())
- .build();
- costTimeHistogram.record(duration.toMillis(), attributes);
+ .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName()).build();
+ histogram.record(duration.toMillis(), attributes);
}
}
@@ -77,11 +76,11 @@ public class MetricMessageInterceptor implements MessageInterceptor {
}
final Timestamp deliveryTimestampFromRemote = optionalDeliveryTimestampFromRemote.get();
final long latency = System.currentTimeMillis() - Timestamps.toMillis(deliveryTimestampFromRemote);
- final DoubleHistogram messageDeliveryLatencyHistogram = messageMeter.getMessageDeliveryLatencyHistogram();
+ final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.DELIVERY_LATENCY);
final Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
.put(MetricLabels.CONSUMER_GROUP, consumerGroup)
.put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId()).build();
- messageDeliveryLatencyHistogram.record(latency, attributes);
+ histogram.record(latency, attributes);
}
private void doBeforeConsumeMessage(List<MessageCommon> messageCommons) {
@@ -100,13 +99,13 @@ public class MetricMessageInterceptor implements MessageInterceptor {
Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
.put(MetricLabels.CONSUMER_GROUP, consumerGroup)
.put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId()).build();
- final DoubleHistogram histogram = messageMeter.getMessageAwaitTimeHistogram();
+ final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.AWAIT_TIME);
histogram.record(durationAfterDecoding.toMillis(), attributes);
}
private void doAfterProcessMessage(List<MessageCommon> messageCommons, Duration duration,
MessageHookPointsStatus status) {
- final DoubleHistogram processCostTimeHistogram = messageMeter.getProcessCostTimeHistogram();
+ final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.PROCESS_TIME);
final ClientImpl client = messageMeter.getClient();
if (!(client instanceof PushConsumer)) {
// Should never reach here.
@@ -122,7 +121,7 @@ public class MetricMessageInterceptor implements MessageInterceptor {
.put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId())
.put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName())
.build();
- processCostTimeHistogram.record(duration.toMillis(), attributes);
+ histogram.record(duration.toMillis(), attributes);
}
}