You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/06/27 08:24:38 UTC

[rocketmq-clients] 02/02: Java: amend the bucket boundaries for metrics

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 874d750999dfb3a8992b87ea928f9dd3ebf862e1
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Jun 27 16:23:08 2022 +0800

    Java: amend the bucket boundaries for metrics
---
 .../client/java/impl/producer/ProducerImpl.java    |  6 +--
 .../client/java/metrics/HistogramBuckets.java      | 44 ++++++++++++++++++++++
 .../rocketmq/client/java/metrics/MessageMeter.java | 43 ++++++++++++---------
 3 files changed, 73 insertions(+), 20 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index dd8be93..ab711c4 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -269,9 +269,9 @@ class ProducerImpl extends ClientImpl implements Producer {
         } catch (Throwable t) {
             throw new ClientException(t);
         }
-        final EndTransactionRequest.Builder builder =
-            EndTransactionRequest.newBuilder().setMessageId(messageId.toString()).setTransactionId(transactionId)
-                .setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(messageCommon.getTopic()).build());
+        final EndTransactionRequest.Builder builder = EndTransactionRequest.newBuilder()
+            .setMessageId(messageId.toString()).setTransactionId(transactionId)
+            .setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(messageCommon.getTopic()).build());
         switch (resolution) {
             case COMMIT:
                 builder.setResolution(apache.rocketmq.v2.TransactionResolution.COMMIT);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramBuckets.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramBuckets.java
new file mode 100644
index 0000000..777ab2a
--- /dev/null
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramBuckets.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.metrics;
+
+import io.opentelemetry.sdk.metrics.Aggregation;
+import java.util.Arrays;
+
+public class HistogramBuckets {
+    /**
+     * Histogram bucket for {@link MetricName#SEND_SUCCESS_COST_TIME}, time unit is milliseconds.
+     */
+    public static final Aggregation SEND_SUCCESS_COST_TIME_BUCKET =
+        Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0, 10.0, 20.0, 50.0, 200.0, 500.0));
+    /**
+     * Histogram bucket for {@link MetricName#DELIVERY_LATENCY}, time unit is milliseconds.
+     */
+    public static final Aggregation DELIVERY_LATENCY_BUCKET = Aggregation.explicitBucketHistogram(Arrays.asList(1.0,
+        5.0, 10.0, 20.0, 50.0, 200.0, 500.0));
+    /**
+     * Histogram bucket for {@link MetricName#AWAIT_TIME}, time unit is milliseconds.
+     */
+    public static final Aggregation AWAIT_TIME_BUCKET = Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0,
+        20.0, 100.0, 1000.0, 5 * 1000.0, 10 * 1000.0));
+    /**
+     * Histogram bucket for {@link MetricName#PROCESS_TIME}, time unit is milliseconds.
+     */
+    public static final Aggregation PROCESS_TIME_BUCKET = Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0,
+        10.0, 100.0, 1000.0, 10 * 1000.0, 60 * 1000.0));
+}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
index 12a55d2..06f65fb 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
@@ -28,7 +28,6 @@ import io.opentelemetry.api.metrics.LongCounter;
 import io.opentelemetry.api.metrics.Meter;
 import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
 import io.opentelemetry.sdk.OpenTelemetrySdk;
-import io.opentelemetry.sdk.metrics.Aggregation;
 import io.opentelemetry.sdk.metrics.InstrumentSelector;
 import io.opentelemetry.sdk.metrics.InstrumentType;
 import io.opentelemetry.sdk.metrics.SdkMeterProvider;
@@ -36,7 +35,6 @@ import io.opentelemetry.sdk.metrics.View;
 import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
 import java.net.InetSocketAddress;
 import java.time.Duration;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -139,10 +137,9 @@ public class MessageMeter {
             }
             final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
                 .build();
-            final NettyChannelBuilder channelBuilder =
-                NettyChannelBuilder.forTarget(newMetricEndpoints.getGrpcTarget())
-                    .sslContext(sslContext)
-                    .intercept(new AuthInterceptor(client.getClientConfiguration(), client.getClientId()));
+            final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(newMetricEndpoints.getGrpcTarget())
+                .sslContext(sslContext)
+                .intercept(new AuthInterceptor(client.getClientConfiguration(), client.getClientId()));
             final List<InetSocketAddress> socketAddresses = newMetricEndpoints.toSocketAddresses();
             if (null != socketAddresses) {
                 IpNameResolverFactory metricResolverFactory = new IpNameResolverFactory(socketAddresses);
@@ -150,24 +147,36 @@ public class MessageMeter {
             }
             ManagedChannel channel = channelBuilder.build();
 
-            OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.builder()
-                .setChannel(channel)
+            OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.builder().setChannel(channel)
                 .setTimeout(METRIC_EXPORTER_RPC_TIMEOUT).build();
 
-            InstrumentSelector instrumentSelector = InstrumentSelector.builder()
-                .setType(InstrumentType.HISTOGRAM)
-                .setName("lingchuhistogram")
-                .build();
+            InstrumentSelector sendSuccessCostTimeInstrumentSelector = InstrumentSelector.builder()
+                .setType(InstrumentType.HISTOGRAM).setName(MetricName.SEND_SUCCESS_COST_TIME.getName()).build();
+            final View sendSuccessCostTimeView = View.builder()
+                .setAggregation(HistogramBuckets.SEND_SUCCESS_COST_TIME_BUCKET).build();
 
-            final View view = View.builder()
-                .setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(0.1d, 1d, 10d, 30d, 60d)))
+            InstrumentSelector deliveryLatencyInstrumentSelector = InstrumentSelector.builder()
+                .setType(InstrumentType.HISTOGRAM).setName(MetricName.DELIVERY_LATENCY.getName()).build();
+            final View deliveryLatencyView = View.builder().setAggregation(HistogramBuckets.DELIVERY_LATENCY_BUCKET)
                 .build();
 
-            PeriodicMetricReader reader =
-                PeriodicMetricReader.builder(exporter).setInterval(Duration.ofSeconds(1)).build();
+            InstrumentSelector awaitTimeInstrumentSelector = InstrumentSelector.builder()
+                .setType(InstrumentType.HISTOGRAM).setName(MetricName.AWAIT_TIME.getName()).build();
+            final View awaitTimeView = View.builder().setAggregation(HistogramBuckets.AWAIT_TIME_BUCKET).build();
+
+            InstrumentSelector processTimeInstrumentSelector = InstrumentSelector.builder()
+                .setType(InstrumentType.HISTOGRAM).setName(MetricName.PROCESS_TIME.getName()).build();
+            final View processTimeView = View.builder().setAggregation(HistogramBuckets.PROCESS_TIME_BUCKET).build();
+
+            PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter)
+                .setInterval(Duration.ofSeconds(1)).build();
             provider = SdkMeterProvider.builder().registerMetricReader(reader)
-                .registerView(instrumentSelector, view)
+                .registerView(sendSuccessCostTimeInstrumentSelector, sendSuccessCostTimeView)
+                .registerView(deliveryLatencyInstrumentSelector, deliveryLatencyView)
+                .registerView(awaitTimeInstrumentSelector, awaitTimeView)
+                .registerView(processTimeInstrumentSelector, processTimeView)
                 .build();
+
             final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(provider).build();
             meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
             LOGGER.info("Message meter exporter is updated, clientId={}, {} => {}", client.getClientId(),