You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/02/28 04:16:35 UTC

[skywalking] branch master updated: Adapt otel exponential histogram data (#10449)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new f93a98d4c7 Adapt otel exponential histogram data (#10449)
f93a98d4c7 is described below

commit f93a98d4c7fb2e0a4b2528e98a9f9290635401a7
Author: 叶梦飞 <70...@users.noreply.github.com>
AuthorDate: Tue Feb 28 12:16:22 2023 +0800

    Adapt otel exponential histogram data (#10449)
---
 docs/en/changes/changes.md                         |   1 +
 .../otlp/OpenTelemetryMetricRequestProcessor.java  |  93 ++++++++++++--
 .../OpenTelemetryMetricRequestProcessorTest.java   | 134 +++++++++++++++++++++
 3 files changed, 218 insertions(+), 10 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 8183dd8716..2e71039940 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -104,6 +104,7 @@
 * `Scope` in the Entity of Metrics query v1 protocol is not required and automatical correction. The scope is determined based on the metric itself.
 * Add explicit `ReadTimeout` for ConsulConfigurationWatcher to avoid `IllegalArgumentException: Cache watchInterval=10sec >= networkClientReadTimeout=10000ms`.
 * Fix `DurationUtils.getDurationPoints` exceed, when `startTimeBucket` equals `endTimeBucket`.
+* Support process OpenTelemetry ExponentialHistogram metrics
 
 #### UI
 
diff --git a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java
index e5302472b9..4d3adbbe07 100644
--- a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java
+++ b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java
@@ -24,12 +24,14 @@ import io.opentelemetry.proto.common.v1.KeyValue;
 import io.opentelemetry.proto.metrics.v1.Sum;
 import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
 import io.vavr.Function1;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Stream;
+
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
@@ -161,6 +163,57 @@ public class OpenTelemetryMetricRequestProcessor implements Service {
         return result;
     }
 
+    /**
+     * ExponentialHistogram data points are an alternate representation to the Histogram data point in OpenTelemetry
+     * metric format(https://opentelemetry.io/docs/reference/specification/metrics/data-model/#exponentialhistogram).
+     * It uses scale, offset and bucket index to calculate the bound. Firstly, calculate the base using scale by
+     * formula: base = 2**(2**(-scale)). Then the upperBound of specific bucket can be calculated by formula:
+     * base**(offset+index+1). Above calculation way is about positive buckets. For the negative case, we just
+     * map them by their absolute value into the negative range using the same scale as the positive range. So the
+     * upperBound should be calculated as -base**(offset+index).
+     *
+     * Ignored the zero_count field temporarily,
+     * because the zero_threshold even could overlap the existing bucket scopes.
+     *
+     * @param positiveOffset       corresponding to positive Buckets' offset in ExponentialHistogramDataPoint
+     * @param positiveBucketCounts corresponding to positive Buckets' bucket_counts in ExponentialHistogramDataPoint
+     * @param negativeOffset       corresponding to negative Buckets' offset in ExponentialHistogramDataPoint
+     * @param negativeBucketCounts corresponding to negative Buckets' bucket_counts in ExponentialHistogramDataPoint
+     * @param scale                corresponding to scale in ExponentialHistogramDataPoint
+     * @return The map is a bucket set for histogram, the key is specific bucket's upperBound, the value is item count
+     * in this bucket lower than or equals to key(upperBound)
+     */
+    private static Map<Double, Long> buildBucketsFromExponentialHistogram(
+        int positiveOffset, final List<Long> positiveBucketCounts,
+        int negativeOffset, final List<Long> negativeBucketCounts, int scale) {
+
+        final Map<Double, Long> result = new HashMap<>();
+        double base = Math.pow(2.0, Math.pow(2.0, -scale));
+        if (base == Double.POSITIVE_INFINITY) {
+            log.warn("Receive and reject out-of-range ExponentialHistogram data");
+            return result;
+        }
+        double upperBound;
+        for (int i = 0; i < negativeBucketCounts.size(); i++) {
+            upperBound = -Math.pow(base, negativeOffset + i);
+            if (upperBound == Double.NEGATIVE_INFINITY) {
+                log.warn("Receive and reject out-of-range ExponentialHistogram data");
+                return new HashMap<>();
+            }
+            result.put(upperBound, negativeBucketCounts.get(i));
+        }
+        for (int i = 0; i < positiveBucketCounts.size() - 1; i++) {
+            upperBound = Math.pow(base, positiveOffset + i + 1);
+            if (upperBound == Double.POSITIVE_INFINITY) {
+                log.warn("Receive and reject out-of-range ExponentialHistogram data");
+                return new HashMap<>();
+            }
+            result.put(upperBound, positiveBucketCounts.get(i));
+        }
+        result.put(Double.POSITIVE_INFINITY, positiveBucketCounts.get(positiveBucketCounts.size() - 1));
+        return result;
+    }
+
     // Adapt the OpenTelemetry metrics to SkyWalking metrics
     private Stream<? extends Metric> adaptMetrics(
         final Map<String, String> nodeLabels,
@@ -187,16 +240,16 @@ public class OpenTelemetryMetricRequestProcessor implements Service {
             if (sum
                 .getAggregationTemporality() == AGGREGATION_TEMPORALITY_DELTA) {
                 return sum.getDataPointsList().stream()
-                        .map(point -> new Gauge(
-                                metric.getName(),
-                                mergeLabels(
-                                    nodeLabels,
-                                    buildLabels(point.getAttributesList())
-                                ),
-                                point.hasAsDouble() ? point.getAsDouble()
-                                    : point.getAsInt(),
-                                point.getTimeUnixNano() / 1000000
-                        ));
+                          .map(point -> new Gauge(
+                              metric.getName(),
+                              mergeLabels(
+                                  nodeLabels,
+                                  buildLabels(point.getAttributesList())
+                              ),
+                              point.hasAsDouble() ? point.getAsDouble()
+                                  : point.getAsInt(),
+                              point.getTimeUnixNano() / 1000000
+                          ));
             }
             if (sum.getIsMonotonic()) {
                 return sum.getDataPointsList().stream()
@@ -241,6 +294,26 @@ public class OpenTelemetryMetricRequestProcessor implements Service {
                              point.getTimeUnixNano() / 1000000
                          ));
         }
+        if (metric.hasExponentialHistogram()) {
+            return metric.getExponentialHistogram().getDataPointsList().stream()
+                         .map(point -> new Histogram(
+                             metric.getName(),
+                             mergeLabels(
+                                 nodeLabels,
+                                 buildLabels(point.getAttributesList())
+                             ),
+                             point.getCount(),
+                             point.getSum(),
+                             buildBucketsFromExponentialHistogram(
+                                 point.getPositive().getOffset(),
+                                 point.getPositive().getBucketCountsList(),
+                                 point.getNegative().getOffset(),
+                                 point.getNegative().getBucketCountsList(),
+                                 point.getScale()
+                             ),
+                             point.getTimeUnixNano() / 1000000
+                         ));
+        }
         if (metric.hasSummary()) {
             return metric.getSummary().getDataPointsList().stream()
                          .map(point -> new Summary(
diff --git a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessorTest.java b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessorTest.java
new file mode 100644
index 0000000000..0a40e3bd21
--- /dev/null
+++ b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessorTest.java
@@ -0,0 +1,134 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.receiver.otel.otlp;
+
+import io.opentelemetry.proto.metrics.v1.ExponentialHistogram;
+import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Histogram;
+import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class OpenTelemetryMetricRequestProcessorTest {
+
+    private OtelMetricReceiverConfig config;
+
+    private ModuleManager manager;
+
+    private OpenTelemetryMetricRequestProcessor metricRequestProcessor;
+
+    private Map<String, String> nodeLabels;
+
+    @BeforeEach
+    public void setUp() {
+        manager = new ModuleManager();
+        config = new OtelMetricReceiverConfig();
+        metricRequestProcessor = new OpenTelemetryMetricRequestProcessor(manager, config);
+        nodeLabels = new HashMap<>();
+    }
+
+    @Test
+    public void testAdaptExponentialHistogram() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+        Class<OpenTelemetryMetricRequestProcessor> clazz = OpenTelemetryMetricRequestProcessor.class;
+        Method adaptMetricsMethod = clazz.getDeclaredMethod("adaptMetrics", Map.class, Metric.class);
+        adaptMetricsMethod.setAccessible(true);
+
+        // number is 4; 7, 7.5; 8.5, 8.7, 9.4
+        var positiveBuckets = ExponentialHistogramDataPoint.Buckets.newBuilder()
+                                                                   .setOffset(10)
+                                                                   .addBucketCounts(
+                                                                       1) // (0, 6.72]
+                                                                   .addBucketCounts(
+                                                                       2
+                                                                   ) // (6.72, 8]
+                                                                   .addBucketCounts(
+                                                                       3
+                                                                   ) // (8, 9.51]
+                                                                   .build();
+        // number is -14, -14.5, -15; -18; -21, -26
+        var negativeBuckets = ExponentialHistogramDataPoint.Buckets.newBuilder()
+                                                                   .setOffset(15)
+                                                                   .addBucketCounts(
+                                                                       3
+                                                                   ) // (-16, -13.45]
+                                                                   .addBucketCounts(
+                                                                       1
+                                                                   ) // (-19.02, -16]
+                                                                   .addBucketCounts(
+                                                                       2
+                                                                   ) // (-INFINITY, -19.02]
+                                                                   .build();
+        var dataPoint = ExponentialHistogramDataPoint.newBuilder()
+                                                     .setCount(12)
+                                                     .setSum(-63.4)
+                                                     .setScale(2)
+                                                     .setPositive(positiveBuckets)
+                                                     .setNegative(negativeBuckets)
+                                                     .setTimeUnixNano(1000000)
+                                                     .build();
+        ExponentialHistogram exponentialHistogram = ExponentialHistogram.newBuilder()
+                                                                        .addDataPoints(dataPoint)
+                                                                        .build();
+        Metric metric = Metric.newBuilder()
+                              .setName("test_metric")
+                              .setExponentialHistogram(exponentialHistogram)
+                              .build();
+
+        Stream<Histogram> stream = (Stream<Histogram>) adaptMetricsMethod.invoke(
+            metricRequestProcessor, nodeLabels, metric);
+        List<Histogram> list = stream.collect(Collectors.toList());
+        Histogram histogramMetric = list.get(0);
+        assertEquals("test_metric", histogramMetric.getName());
+        assertEquals(1, histogramMetric.getTimestamp());
+        assertEquals(12, histogramMetric.getSampleCount());
+        assertEquals(-63.4, histogramMetric.getSampleSum());
+
+        // validate the key and value of bucket
+        double base = Math.pow(2, Math.pow(2, -2));
+
+        assertTrue(histogramMetric.getBuckets().containsKey(Math.pow(base, 11)));
+        assertEquals(1, histogramMetric.getBuckets().get(Math.pow(base, 11)));
+
+        assertTrue(histogramMetric.getBuckets().containsKey(Math.pow(base, 12)));
+        assertEquals(2, histogramMetric.getBuckets().get(Math.pow(base, 12)));
+
+        assertTrue(histogramMetric.getBuckets().containsKey(Double.POSITIVE_INFINITY));
+        assertEquals(3, histogramMetric.getBuckets().get(Double.POSITIVE_INFINITY));
+
+        assertTrue(histogramMetric.getBuckets().containsKey(-Math.pow(base, 15)));
+        assertEquals(3, histogramMetric.getBuckets().get(-Math.pow(base, 15)));
+
+        assertTrue(histogramMetric.getBuckets().containsKey(-Math.pow(base, 16)));
+        assertEquals(1, histogramMetric.getBuckets().get(-Math.pow(base, 16)));
+
+        assertTrue(histogramMetric.getBuckets().containsKey(-Math.pow(base, 17)));
+        assertEquals(2, histogramMetric.getBuckets().get(-Math.pow(base, 17)));
+    }
+}