You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/06/27 08:24:38 UTC
[rocketmq-clients] 02/02: Java: amend the bucket boundaries for metrics
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 874d750999dfb3a8992b87ea928f9dd3ebf862e1
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Jun 27 16:23:08 2022 +0800
Java: amend the bucket boundaries for metrics
---
.../client/java/impl/producer/ProducerImpl.java | 6 +--
.../client/java/metrics/HistogramBuckets.java | 44 ++++++++++++++++++++++
.../rocketmq/client/java/metrics/MessageMeter.java | 43 ++++++++++++---------
3 files changed, 73 insertions(+), 20 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index dd8be93..ab711c4 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -269,9 +269,9 @@ class ProducerImpl extends ClientImpl implements Producer {
} catch (Throwable t) {
throw new ClientException(t);
}
- final EndTransactionRequest.Builder builder =
- EndTransactionRequest.newBuilder().setMessageId(messageId.toString()).setTransactionId(transactionId)
- .setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(messageCommon.getTopic()).build());
+ final EndTransactionRequest.Builder builder = EndTransactionRequest.newBuilder()
+ .setMessageId(messageId.toString()).setTransactionId(transactionId)
+ .setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(messageCommon.getTopic()).build());
switch (resolution) {
case COMMIT:
builder.setResolution(apache.rocketmq.v2.TransactionResolution.COMMIT);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramBuckets.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramBuckets.java
new file mode 100644
index 0000000..777ab2a
--- /dev/null
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramBuckets.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.metrics;
+
+import io.opentelemetry.sdk.metrics.Aggregation;
+import java.util.Arrays;
+
+public class HistogramBuckets {
+ /**
+ * Histogram bucket for {@link MetricName#SEND_SUCCESS_COST_TIME}, time unit is milliseconds.
+ */
+ public static final Aggregation SEND_SUCCESS_COST_TIME_BUCKET =
+ Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0, 10.0, 20.0, 50.0, 200.0, 500.0));
+ /**
+ * Histogram bucket for {@link MetricName#DELIVERY_LATENCY}, time unit is milliseconds.
+ */
+ public static final Aggregation DELIVERY_LATENCY_BUCKET = Aggregation.explicitBucketHistogram(Arrays.asList(1.0,
+ 5.0, 10.0, 20.0, 50.0, 200.0, 500.0));
+ /**
+ * Histogram bucket for {@link MetricName#AWAIT_TIME}, time unit is milliseconds.
+ */
+ public static final Aggregation AWAIT_TIME_BUCKET = Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0,
+ 20.0, 100.0, 1000.0, 5 * 1000.0, 10 * 1000.0));
+ /**
+ * Histogram bucket for {@link MetricName#PROCESS_TIME}, time unit is milliseconds.
+ */
+ public static final Aggregation PROCESS_TIME_BUCKET = Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0,
+ 10.0, 100.0, 1000.0, 10 * 1000.0, 60 * 1000.0));
+}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
index 12a55d2..06f65fb 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
@@ -28,7 +28,6 @@ import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
-import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
@@ -36,7 +35,6 @@ import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import java.net.InetSocketAddress;
import java.time.Duration;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -139,10 +137,9 @@ public class MessageMeter {
}
final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
- final NettyChannelBuilder channelBuilder =
- NettyChannelBuilder.forTarget(newMetricEndpoints.getGrpcTarget())
- .sslContext(sslContext)
- .intercept(new AuthInterceptor(client.getClientConfiguration(), client.getClientId()));
+ final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(newMetricEndpoints.getGrpcTarget())
+ .sslContext(sslContext)
+ .intercept(new AuthInterceptor(client.getClientConfiguration(), client.getClientId()));
final List<InetSocketAddress> socketAddresses = newMetricEndpoints.toSocketAddresses();
if (null != socketAddresses) {
IpNameResolverFactory metricResolverFactory = new IpNameResolverFactory(socketAddresses);
@@ -150,24 +147,36 @@ public class MessageMeter {
}
ManagedChannel channel = channelBuilder.build();
- OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.builder()
- .setChannel(channel)
+ OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.builder().setChannel(channel)
.setTimeout(METRIC_EXPORTER_RPC_TIMEOUT).build();
- InstrumentSelector instrumentSelector = InstrumentSelector.builder()
- .setType(InstrumentType.HISTOGRAM)
- .setName("lingchuhistogram")
- .build();
+ InstrumentSelector sendSuccessCostTimeInstrumentSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM).setName(MetricName.SEND_SUCCESS_COST_TIME.getName()).build();
+ final View sendSuccessCostTimeView = View.builder()
+ .setAggregation(HistogramBuckets.SEND_SUCCESS_COST_TIME_BUCKET).build();
- final View view = View.builder()
- .setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(0.1d, 1d, 10d, 30d, 60d)))
+ InstrumentSelector deliveryLatencyInstrumentSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM).setName(MetricName.DELIVERY_LATENCY.getName()).build();
+ final View deliveryLatencyView = View.builder().setAggregation(HistogramBuckets.DELIVERY_LATENCY_BUCKET)
.build();
- PeriodicMetricReader reader =
- PeriodicMetricReader.builder(exporter).setInterval(Duration.ofSeconds(1)).build();
+ InstrumentSelector awaitTimeInstrumentSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM).setName(MetricName.AWAIT_TIME.getName()).build();
+ final View awaitTimeView = View.builder().setAggregation(HistogramBuckets.AWAIT_TIME_BUCKET).build();
+
+ InstrumentSelector processTimeInstrumentSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM).setName(MetricName.PROCESS_TIME.getName()).build();
+ final View processTimeView = View.builder().setAggregation(HistogramBuckets.PROCESS_TIME_BUCKET).build();
+
+ PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter)
+ .setInterval(Duration.ofSeconds(1)).build();
provider = SdkMeterProvider.builder().registerMetricReader(reader)
- .registerView(instrumentSelector, view)
+ .registerView(sendSuccessCostTimeInstrumentSelector, sendSuccessCostTimeView)
+ .registerView(deliveryLatencyInstrumentSelector, deliveryLatencyView)
+ .registerView(awaitTimeInstrumentSelector, awaitTimeView)
+ .registerView(processTimeInstrumentSelector, processTimeView)
.build();
+
final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(provider).build();
meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
LOGGER.info("Message meter exporter is updated, clientId={}, {} => {}", client.getClientId(),