You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/02/05 09:47:56 UTC
[skywalking] branch master updated: Polish aws-firehose-receiver to adapt existing OTEL proto (#10343)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new b991cfd125 Polish aws-firehose-receiver to adapt existing OTEL proto (#10343)
b991cfd125 is described below
commit b991cfd125da0a2597cf299a38e40b4fa5ec16d3
Author: pg.yang <pg...@hotmail.com>
AuthorDate: Sun Feb 5 17:47:48 2023 +0800
Polish aws-firehose-receiver to adapt existing OTEL proto (#10343)
---
LICENSE | 1 +
docs/en/setup/backend/aws-firehose-receiver.md | 2 +-
.../aws-firehose-receiver/pom.xml | 31 +
.../receiver/aws/firehose/FirehoseHTTPHandler.java | 6 +-
.../aws/firehose/OtelMetricsConvertor.java | 344 +++++++++++
.../proto/opentelemetry/proto/collector/README.md | 9 +
.../collector/metrics/v1/metrics_service.proto | 45 ++
.../collector/metrics/v1/metrics_service_http.yaml | 9 +
.../opentelemetry/proto/common/v1/common.proto | 78 +++
.../proto/metrics/experimental/configservice.proto | 102 ++++
.../opentelemetry/proto/metrics/v1/metrics.proto | 636 +++++++++++++++++++++
.../opentelemetry/proto/resource/v1/resource.proto | 34 ++
.../aws/firehose/OtelMetricsConvertorTest.java | 90 +++
.../convertor-test-data/s3-data-1/expect.json | 132 +++++
.../convertor-test-data/s3-data-1/source.json | 116 ++++
.../convertor-test-data/s3-data-2/expect.json | 285 +++++++++
.../convertor-test-data/s3-data-2/source.json | 245 ++++++++
17 files changed, 2161 insertions(+), 4 deletions(-)
diff --git a/LICENSE b/LICENSE
index b073d89776..1429067582 100644
--- a/LICENSE
+++ b/LICENSE
@@ -220,6 +220,7 @@ The text of each license is the standard Apache 2.0 license.
proto files from opencensus: https://github.com/census-instrumentation/opencensus-proto/tree/master/gen-go Apache 2.0
proto files from prometheus/client_model: https://github.com/prometheus/client_model Apache 2.0
proto files from opentelemetry: https://github.com/open-telemetry/opentelemetry-proto/tree/main/opentelemetry/proto Apache 2.0
+ proto files from opentelemetry: https://github.com/open-telemetry/opentelemetry-proto/tree/v0.7.0 Apache 2.0
flatbuffers files from istio/proxy: https://github.com/istio/proxy Apache 2.0
mvnw files from https://github.com/apache/maven-wrapper Apache 2.0
svg files from skywalking-ui/src/assets/icons: https://github.com/google/material-design-icons Apache 2.0
diff --git a/docs/en/setup/backend/aws-firehose-receiver.md b/docs/en/setup/backend/aws-firehose-receiver.md
index 43c27df277..e422c46f4d 100644
--- a/docs/en/setup/backend/aws-firehose-receiver.md
+++ b/docs/en/setup/backend/aws-firehose-receiver.md
@@ -5,7 +5,7 @@ You could leverage the receiver to collect [AWS CloudWatch metrics](https://docs
## Setup(S3 example)
-1. Create CloudWatch metrics configuration for S3 (refer to [S3 CloudWatch metrics](https://docs.aws.amazon.com/AmazonS3/latest/userguide/configure-request-metrics-bucket.html))
+1. Create CloudWatch metrics configuration for S3 (refer to [S3 CloudWatch metrics](https://docs.aws.amazon.com/AmazonS3/latest/userguide/configure-request-metrics-bucket.html))
2. Stream CloudWatch metrics to AWS Kinesis Data Firehose delivery stream by [CloudWatch metrics stream](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-setup-datalake.html)
3. Specify AWS Kinesis Data Firehose delivery stream HTTP Endpoint (refer to [Choose HTTP Endpoint for Your Destination](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http))
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/pom.xml b/oap-server/server-receiver-plugin/aws-firehose-receiver/pom.xml
index 1b86b698f9..03b9335a2a 100644
--- a/oap-server/server-receiver-plugin/aws-firehose-receiver/pom.xml
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/pom.xml
@@ -38,4 +38,35 @@
<version>${project.version}</version>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>${protobuf-maven-plugin.version}</version>
+ <configuration>
+ <!--
+ The version of protoc must match protobuf-java. If you don't depend on
+ protobuf-java directly, you will be transitively depending on the
+ protobuf-java version that grpc depends on.
+ -->
+ <protocArtifact>com.google.protobuf:protoc:3.19.2:exe:${os.detected.classifier}
+ </protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.42.1:exe:${os.detected.classifier}
+ </pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseHTTPHandler.java b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseHTTPHandler.java
index 760630dd06..9a11018a94 100644
--- a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseHTTPHandler.java
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseHTTPHandler.java
@@ -23,7 +23,7 @@ import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.server.annotation.ConsumesJson;
import com.linecorp.armeria.server.annotation.Post;
import com.linecorp.armeria.server.annotation.ProducesJson;
-import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
+import io.opentelemetry.proto.collector.metrics.firehose.v0_7.ExportMetricsServiceRequest;
import java.io.ByteArrayInputStream;
import java.util.Base64;
import lombok.AllArgsConstructor;
@@ -33,7 +33,6 @@ import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRe
@Slf4j
@AllArgsConstructor
public class FirehoseHTTPHandler {
-
private final OpenTelemetryMetricRequestProcessor openTelemetryMetricRequestProcessor;
@Post("/aws/firehose/metrics")
@@ -46,7 +45,8 @@ public class FirehoseHTTPHandler {
Base64.getDecoder().decode(record.getData()));
ExportMetricsServiceRequest request;
while ((request = ExportMetricsServiceRequest.parseDelimitedFrom(byteArrayInputStream)) != null) {
- openTelemetryMetricRequestProcessor.processMetricsRequest(request);
+ openTelemetryMetricRequestProcessor.processMetricsRequest(
+ OtelMetricsConvertor.convertExportMetricsRequest(request));
}
}
} catch (InvalidProtocolBufferException e) {
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/OtelMetricsConvertor.java b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/OtelMetricsConvertor.java
new file mode 100644
index 0000000000..0713680f2e
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/OtelMetricsConvertor.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.receiver.aws.firehose;
+
+import io.opentelemetry.proto.collector.metrics.firehose.v0_7.ExportMetricsServiceRequest;
+import io.opentelemetry.proto.common.firehose.v0_7.ArrayValue;
+import io.opentelemetry.proto.common.firehose.v0_7.KeyValue;
+import io.opentelemetry.proto.common.firehose.v0_7.KeyValueList;
+import io.opentelemetry.proto.common.firehose.v0_7.StringKeyValue;
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleDataPoint;
+import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleGauge;
+import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleHistogram;
+import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleHistogramDataPoint;
+import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleSum;
+import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleSummary;
+import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleSummaryDataPoint;
+import io.opentelemetry.proto.metrics.firehose.v0_7.InstrumentationLibraryMetrics;
+import io.opentelemetry.proto.metrics.firehose.v0_7.IntDataPoint;
+import io.opentelemetry.proto.metrics.firehose.v0_7.IntGauge;
+import io.opentelemetry.proto.metrics.firehose.v0_7.IntHistogram;
+import io.opentelemetry.proto.metrics.firehose.v0_7.IntHistogramDataPoint;
+import io.opentelemetry.proto.metrics.firehose.v0_7.IntSum;
+import io.opentelemetry.proto.metrics.firehose.v0_7.Metric;
+import io.opentelemetry.proto.metrics.firehose.v0_7.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.DataPointFlags;
+import io.opentelemetry.proto.metrics.v1.Gauge;
+import io.opentelemetry.proto.metrics.v1.Histogram;
+import io.opentelemetry.proto.metrics.v1.HistogramDataPoint;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+import io.opentelemetry.proto.metrics.v1.Sum;
+import io.opentelemetry.proto.metrics.v1.Summary;
+import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
+import io.opentelemetry.proto.resource.v1.Resource;
+import java.util.Optional;
+
+public class OtelMetricsConvertor {
+
+ public static io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest convertExportMetricsRequest(
+ ExportMetricsServiceRequest sourceRequest) {
+ io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest.Builder targetRequestBuilder = io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest.newBuilder();
+ for (ResourceMetrics resourceMetrics : sourceRequest.getResourceMetricsList()) {
+ targetRequestBuilder.addResourceMetrics(convertResourceMetrics(resourceMetrics));
+ }
+ return targetRequestBuilder.build();
+ }
+
+ private static io.opentelemetry.proto.metrics.v1.ResourceMetrics convertResourceMetrics(final ResourceMetrics resourceMetrics) {
+ io.opentelemetry.proto.metrics.v1.ResourceMetrics.Builder targetResourceMetricsBuilder = io.opentelemetry.proto.metrics.v1.ResourceMetrics.newBuilder();
+ targetResourceMetricsBuilder.setResource(convertResource(resourceMetrics.getResource()));
+ for (InstrumentationLibraryMetrics instrumentationLibraryMetrics : resourceMetrics.getInstrumentationLibraryMetricsList()) {
+ targetResourceMetricsBuilder.addScopeMetrics(convertScopeMetrics(instrumentationLibraryMetrics));
+ }
+ return targetResourceMetricsBuilder.build();
+ }
+
+ private static ScopeMetrics convertScopeMetrics(final InstrumentationLibraryMetrics instrumentationLibraryMetrics) {
+ final ScopeMetrics.Builder builder = ScopeMetrics.newBuilder();
+ for (Metric metric : instrumentationLibraryMetrics.getMetricsList()) {
+ builder.addMetrics(convertMetrics(metric));
+ }
+ return builder.build();
+ }
+
+ private static io.opentelemetry.proto.metrics.v1.Metric convertMetrics(final Metric metric) {
+ final io.opentelemetry.proto.metrics.v1.Metric.Builder builder = io.opentelemetry.proto.metrics.v1.Metric.newBuilder();
+ builder.setDescription(metric.getDescription());
+ builder.setDescriptionBytes(metric.getDescriptionBytes());
+ builder.setName(metric.getName());
+ builder.setNameBytes(metric.getNameBytes());
+ builder.setUnit(metric.getUnit());
+ builder.setUnitBytes(metric.getUnitBytes());
+ Optional.of(metric.getDoubleGauge())
+ .map(OtelMetricsConvertor::convertDoubleGauge)
+ .ifPresent(builder::setGauge);
+ Optional.of(metric.getIntGauge()).map(OtelMetricsConvertor::convertIntGauge).ifPresent(builder::setGauge);
+ Optional.of(metric.getDoubleHistogram())
+ .map(OtelMetricsConvertor::convertDoubleHistogram)
+ .ifPresent(builder::setHistogram);
+ Optional.of(metric.getIntHistogram())
+ .map(OtelMetricsConvertor::convertIntHistogram)
+ .ifPresent(builder::setHistogram);
+ Optional.of(metric.getDoubleSum()).map(OtelMetricsConvertor::convertDoubleSum).ifPresent(builder::setSum);
+ Optional.of(metric.getIntSum()).map(OtelMetricsConvertor::convertIntSum).ifPresent(builder::setSum);
+ Optional.of(metric.getDoubleSummary())
+ .map(OtelMetricsConvertor::convertDoubleSummary)
+ .ifPresent(builder::setSummary);
+ return builder.build();
+ }
+
+ private static Summary convertDoubleSummary(final DoubleSummary doubleSummary) {
+ final Summary.Builder builder = Summary.newBuilder();
+ doubleSummary.getDataPointsList()
+ .stream()
+ .map(OtelMetricsConvertor::convertDoubleSummaryDatapoint)
+ .forEach(builder::addDataPoints);
+ return builder.build();
+ }
+
+ private static SummaryDataPoint convertDoubleSummaryDatapoint(final DoubleSummaryDataPoint doubleSummaryDataPoint) {
+ final SummaryDataPoint.Builder builder = SummaryDataPoint.newBuilder();
+ doubleSummaryDataPoint.getLabelsList()
+ .stream()
+ .map(OtelMetricsConvertor::convertStringKV)
+ .forEach(builder::addAttributes);
+ builder.setCount(doubleSummaryDataPoint.getCount());
+ builder.setSum(doubleSummaryDataPoint.getSum());
+ builder.setFlags(DataPointFlags.FLAG_NONE_VALUE);
+ doubleSummaryDataPoint.getQuantileValuesList()
+ .stream()
+ .map(OtelMetricsConvertor::convertValueAtQuantile)
+ .forEach(builder::addQuantileValues);
+ builder.setStartTimeUnixNano(doubleSummaryDataPoint.getStartTimeUnixNano());
+ builder.setTimeUnixNano(doubleSummaryDataPoint.getTimeUnixNano());
+ return builder.build();
+ }
+
+ private static SummaryDataPoint.ValueAtQuantile convertValueAtQuantile(final DoubleSummaryDataPoint.ValueAtQuantile valueAtQuantile) {
+ final SummaryDataPoint.ValueAtQuantile.Builder builder = SummaryDataPoint.ValueAtQuantile.newBuilder();
+ builder.setValue(valueAtQuantile.getValue());
+ builder.setQuantile(valueAtQuantile.getQuantile());
+ return builder.build();
+ }
+
+ private static Sum convertIntSum(final IntSum intSum) {
+ final Sum.Builder builder = Sum.newBuilder();
+ intSum.getDataPointsList()
+ .stream()
+ .map(OtelMetricsConvertor::convertIntDataPoint)
+ .forEach(builder::addDataPoints);
+ return builder.build();
+ }
+
+ private static Sum convertDoubleSum(final DoubleSum doubleSum) {
+ final Sum.Builder builder = Sum.newBuilder();
+ doubleSum.getDataPointsList()
+ .stream()
+ .map(OtelMetricsConvertor::convertDoubleDataPoint)
+ .forEach(builder::addDataPoints);
+ return builder.build();
+ }
+
+ /**
+ * Convert DoubleDataPoint in OTEL 0.7 to NumberDataPoint
+ * Notice this method ignore Exemplar field in HistogramDataPoint
+ */
+ private static NumberDataPoint convertDoubleDataPoint(final DoubleDataPoint doubleDataPoint) {
+ final NumberDataPoint.Builder builder = NumberDataPoint.newBuilder();
+ doubleDataPoint.getLabelsList()
+ .stream()
+ .map(OtelMetricsConvertor::convertStringKV)
+ .forEach(builder::addAttributes);
+ builder.setTimeUnixNano(doubleDataPoint.getTimeUnixNano());
+ builder.setStartTimeUnixNano(doubleDataPoint.getStartTimeUnixNano());
+ builder.setAsDouble(doubleDataPoint.getValue());
+ return builder.build();
+ }
+
+ private static NumberDataPoint convertIntDataPoint(final IntDataPoint intDataPoint) {
+ final NumberDataPoint.Builder builder = NumberDataPoint.newBuilder();
+ intDataPoint.getLabelsList()
+ .stream()
+ .map(OtelMetricsConvertor::convertStringKV)
+ .forEach(builder::addAttributes);
+ builder.setTimeUnixNano(intDataPoint.getTimeUnixNano());
+ builder.setStartTimeUnixNano(intDataPoint.getStartTimeUnixNano());
+ builder.setAsInt(intDataPoint.getValue());
+ return builder.build();
+ }
+
+ private static io.opentelemetry.proto.common.v1.KeyValue convertStringKV(final StringKeyValue stringKeyValue) {
+ return io.opentelemetry.proto.common.v1.KeyValue.newBuilder()
+ .setKey(stringKeyValue.getKey())
+ .setValue(
+ AnyValue.newBuilder()
+ .setStringValue(stringKeyValue.getValue()))
+ .build();
+ }
+
+ private static Histogram convertIntHistogram(final IntHistogram intHistogram) {
+ final Histogram.Builder builder = Histogram.newBuilder();
+ builder.setAggregationTemporality(convertAggregationTemporality(intHistogram.getAggregationTemporality()));
+ builder.setAggregationTemporalityValue(intHistogram.getAggregationTemporalityValue());
+ intHistogram.getDataPointsList()
+ .stream()
+ .map(OtelMetricsConvertor::convertIntHistogramDataPoint)
+ .forEach(builder::addDataPoints);
+ return builder.build();
+ }
+
+ /**
+ * Convert IntHistogramDataPoint in OTEL 0.7 to HistogramDataPoint
+ * Notice this method ignore min, max, Exemplar fields in HistogramDataPoint
+ */
+ private static HistogramDataPoint convertIntHistogramDataPoint(final IntHistogramDataPoint intHistogramDataPoint) {
+ final HistogramDataPoint.Builder builder = HistogramDataPoint.newBuilder();
+ intHistogramDataPoint.getLabelsList()
+ .stream()
+ .map(OtelMetricsConvertor::convertStringKV)
+ .forEach(builder::addAttributes);
+
+ builder.setCount(intHistogramDataPoint.getCount());
+ builder.setSum(intHistogramDataPoint.getSum());
+ builder.setStartTimeUnixNano(intHistogramDataPoint.getStartTimeUnixNano());
+ builder.setTimeUnixNano(intHistogramDataPoint.getTimeUnixNano());
+ builder.addBucketCounts(intHistogramDataPoint.getBucketCountsCount());
+ builder.setFlags(DataPointFlags.FLAG_NONE_VALUE);
+ builder.addAllExplicitBounds(intHistogramDataPoint.getExplicitBoundsList());
+ return builder.build();
+ }
+
+ /**
+ * Convert IntHistogramDataPoint in OTEL 0.7 to HistogramDataPoint
+ * Notice this method ignore min, max, Exemplar fields in HistogramDataPoint
+ */
+ private static HistogramDataPoint convertDoubleHistogramDataPoint(final DoubleHistogramDataPoint intHistogramDataPoint) {
+ final HistogramDataPoint.Builder builder = HistogramDataPoint.newBuilder();
+ intHistogramDataPoint.getLabelsList()
+ .stream()
+ .map(OtelMetricsConvertor::convertStringKV)
+ .forEach(builder::addAttributes);
+ builder.setCount(intHistogramDataPoint.getCount());
+ builder.setSum(intHistogramDataPoint.getSum());
+ builder.setStartTimeUnixNano(intHistogramDataPoint.getStartTimeUnixNano());
+ builder.setTimeUnixNano(intHistogramDataPoint.getTimeUnixNano());
+ builder.addBucketCounts(intHistogramDataPoint.getBucketCountsCount());
+ builder.setFlags(DataPointFlags.FLAG_NONE_VALUE);
+ builder.addAllExplicitBounds(intHistogramDataPoint.getExplicitBoundsList());
+ return builder.build();
+ }
+
+ private static AggregationTemporality convertAggregationTemporality(final io.opentelemetry.proto.metrics.firehose.v0_7.AggregationTemporality aggregationTemporality) {
+
+ if (aggregationTemporality == io.opentelemetry.proto.metrics.firehose.v0_7.AggregationTemporality.AGGREGATION_TEMPORALITY_UNSPECIFIED) {
+ return AggregationTemporality.AGGREGATION_TEMPORALITY_UNSPECIFIED;
+ }
+ if (aggregationTemporality == io.opentelemetry.proto.metrics.firehose.v0_7.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE) {
+ return AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE;
+ }
+
+ if (aggregationTemporality == io.opentelemetry.proto.metrics.firehose.v0_7.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA) {
+ return AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA;
+ }
+ throw new UnsupportedOperationException("Can't convert " + aggregationTemporality);
+ }
+
+ private static Histogram convertDoubleHistogram(final DoubleHistogram doubleHistogram) {
+ final Histogram.Builder builder = Histogram.newBuilder();
+ builder.setAggregationTemporality(convertAggregationTemporality(doubleHistogram.getAggregationTemporality()));
+ builder.setAggregationTemporalityValue(doubleHistogram.getAggregationTemporalityValue());
+ doubleHistogram.getDataPointsList()
+ .stream()
+ .map(OtelMetricsConvertor::convertDoubleHistogramDataPoint)
+ .forEach(builder::addDataPoints);
+ return builder.build();
+
+ }
+
+ private static Gauge convertIntGauge(final IntGauge intGauge) {
+ final Gauge.Builder builder = Gauge.newBuilder();
+ intGauge.getDataPointsList()
+ .stream()
+ .map(OtelMetricsConvertor::convertIntDataPoint)
+ .forEach(builder::addDataPoints);
+ return builder.build();
+ }
+
+ private static Gauge convertDoubleGauge(final DoubleGauge doubleGauge) {
+ final Gauge.Builder builder = Gauge.newBuilder();
+ doubleGauge.getDataPointsList()
+ .stream()
+ .map(OtelMetricsConvertor::convertDoubleDataPoint)
+ .forEach(builder::addDataPoints);
+ return builder.build();
+ }
+
+ private static Resource convertResource(final io.opentelemetry.proto.resource.firehose.v0_7.Resource resource) {
+ final Resource.Builder builder = Resource.newBuilder();
+ for (KeyValue keyValue : resource.getAttributesList()) {
+ builder.addAttributes(convertKeyValue(keyValue));
+ }
+ return builder.build();
+ }
+
+ private static AnyValue convertAnyValue(final io.opentelemetry.proto.common.firehose.v0_7.AnyValue value) {
+ final AnyValue.Builder builder = AnyValue.newBuilder();
+ if (value.hasBoolValue()) {
+ builder.setBoolValue(value.getBoolValue());
+ }
+ if (value.hasDoubleValue()) {
+ builder.setDoubleValue(value.getDoubleValue());
+ }
+ if (value.hasIntValue()) {
+ builder.setIntValue(value.getIntValue());
+ }
+ if (value.hasStringValue()) {
+ builder.setStringValue(value.getStringValue());
+ }
+ if (value.hasArrayValue()) {
+ builder.setArrayValue(convertValuList(value.getArrayValue()));
+ }
+ if (value.hasKvlistValue()) {
+ builder.setKvlistValue(convertKvlistValue(value.getKvlistValue()));
+ }
+ return builder.build();
+ }
+
+ private static io.opentelemetry.proto.common.v1.KeyValueList convertKvlistValue(final KeyValueList keyValueList) {
+ final io.opentelemetry.proto.common.v1.KeyValueList.Builder builder = io.opentelemetry.proto.common.v1.KeyValueList.newBuilder();
+ keyValueList.getValuesList().stream().map(OtelMetricsConvertor::convertKeyValue).forEach(builder::addValues);
+ return builder.build();
+ }
+
+ private static io.opentelemetry.proto.common.v1.KeyValue convertKeyValue(final KeyValue keyValue) {
+ return io.opentelemetry.proto.common.v1.KeyValue.newBuilder()
+ .setKey(keyValue.getKey())
+ .setValue(convertAnyValue(keyValue.getValue()))
+ .build();
+ }
+
+ private static io.opentelemetry.proto.common.v1.ArrayValue convertValuList(final ArrayValue arrayValue) {
+ final io.opentelemetry.proto.common.v1.ArrayValue.Builder builder = io.opentelemetry.proto.common.v1.ArrayValue.newBuilder();
+ arrayValue.getValuesList().stream().map(OtelMetricsConvertor::convertAnyValue).forEach(builder::addValues);
+ return builder.build();
+ }
+
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/README.md b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/README.md
new file mode 100644
index 0000000000..4a73a31ed8
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/README.md
@@ -0,0 +1,9 @@
+# OpenTelemetry Collector Proto
+
+This package describes the OpenTelemetry collector protocol.
+
+## Packages
+
+1. `common` package contains the common messages shared between different services.
+2. `trace` package contains the Trace Service protos.
+3. `metrics` package contains the Metrics Service protos.
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto
new file mode 100644
index 0000000000..6989cf01ff
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto
@@ -0,0 +1,45 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.collector.metrics.v1;
+
+import "opentelemetry/proto/metrics/v1/metrics.proto";
+
+option java_multiple_files = true;
+option java_package = "io.opentelemetry.proto.collector.metrics.firehose.v0_7";
+option java_outer_classname = "MetricsServiceProto";
+option go_package = "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1";
+
+// Service that can be used to push metrics between one Application
+// instrumented with OpenTelemetry and a collector, or between a collector and a
+// central collector.
+service MetricsService {
+ // For performance reasons, it is recommended to keep this RPC
+ // alive for the entire life of the application.
+ rpc Export(ExportMetricsServiceRequest) returns (ExportMetricsServiceResponse) {}
+}
+
+message ExportMetricsServiceRequest {
+ // An array of ResourceMetrics.
+ // For data coming from a single resource this array will typically contain one
+ // element. Intermediary nodes (such as OpenTelemetry Collector) that receive
+ // data from multiple origins typically batch the data before forwarding further and
+ // in that case this array will contain multiple elements.
+ repeated opentelemetry.proto.metrics.v1.ResourceMetrics resource_metrics = 1;
+}
+
+message ExportMetricsServiceResponse {
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/metrics/v1/metrics_service_http.yaml b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/metrics/v1/metrics_service_http.yaml
new file mode 100644
index 0000000000..a545650260
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/metrics/v1/metrics_service_http.yaml
@@ -0,0 +1,9 @@
+# This is an API configuration to generate an HTTP/JSON -> gRPC gateway for the
+# OpenTelemetry service using github.com/grpc-ecosystem/grpc-gateway.
+type: google.api.Service
+config_version: 3
+http:
+ rules:
+ - selector: opentelemetry.proto.collector.metrics.v1.MetricsService.Export
+ post: /v1/metrics
+ body: "*"
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/common/v1/common.proto b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/common/v1/common.proto
new file mode 100644
index 0000000000..340c63105f
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/common/v1/common.proto
@@ -0,0 +1,78 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.common.v1;
+
+option java_multiple_files = true;
+option java_package = "io.opentelemetry.proto.common.firehose.v0_7";
+option java_outer_classname = "CommonProto";
+option go_package = "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1";
+
+// AnyValue is used to represent any type of attribute value. AnyValue may contain a
+// primitive value such as a string or integer or it may contain an arbitrary nested
+// object containing arrays, key-value lists and primitives.
+message AnyValue {
+ // The value is one of the listed fields. It is valid for all values to be unspecified
+ // in which case this AnyValue is considered to be "null".
+ oneof value {
+ string string_value = 1;
+ bool bool_value = 2;
+ int64 int_value = 3;
+ double double_value = 4;
+ ArrayValue array_value = 5;
+ KeyValueList kvlist_value = 6;
+ }
+}
+
+// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message
+// since oneof in AnyValue does not allow repeated fields.
+message ArrayValue {
+ // Array of values. The array may be empty (contain 0 elements).
+ repeated AnyValue values = 1;
+}
+
+// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message
+// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need
+// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to
+// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches
+// are semantically equivalent.
+message KeyValueList {
+ // A collection of key/value pairs of key-value pairs. The list may be empty (may
+ // contain 0 elements).
+ repeated KeyValue values = 1;
+}
+
+// KeyValue is a key-value pair that is used to store Span attributes, Link
+// attributes, etc.
+message KeyValue {
+ string key = 1;
+ AnyValue value = 2;
+}
+
+// StringKeyValue is a pair of key/value strings. This is the simpler (and faster) version
+// of KeyValue that only supports string values.
+message StringKeyValue {
+ string key = 1;
+ string value = 2;
+}
+
+// InstrumentationLibrary is a message representing the instrumentation library information
+// such as the fully qualified name and version.
+message InstrumentationLibrary {
+ // An empty instrumentation library name means the name is unknown.
+ string name = 1;
+ string version = 2;
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/metrics/experimental/configservice.proto b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/metrics/experimental/configservice.proto
new file mode 100644
index 0000000000..a6415b5066
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/metrics/experimental/configservice.proto
@@ -0,0 +1,102 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.metrics.experimental;
+
+import "opentelemetry/proto/resource/v1/resource.proto";
+
+option java_multiple_files = true;
+option java_package = "io.opentelemetry.proto.metrics.firehose.v0_7.experimental";
+option java_outer_classname = "MetricConfigServiceProto";
+option go_package = "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/experimental";
+
+// MetricConfig is a service that enables updating metric schedules, trace
+// parameters, and other configurations on the SDK without having to restart the
+// instrumented application. The collector can also serve as the configuration
+// service, acting as a bridge between third-party configuration services and
+// the SDK, piping updated configs from a third-party source to an instrumented
+// application.
+service MetricConfig {
+ rpc GetMetricConfig (MetricConfigRequest) returns (MetricConfigResponse);
+}
+
+message MetricConfigRequest{
+
+ // Required. The resource for which configuration should be returned.
+ opentelemetry.proto.resource.v1.Resource resource = 1;
+
+ // Optional. The value of MetricConfigResponse.fingerprint for the last
+ // configuration that the caller received and successfully applied.
+ bytes last_known_fingerprint = 2;
+}
+
+message MetricConfigResponse {
+
+ // Optional. The fingerprint associated with this MetricConfigResponse. Each
+ // change in configs yields a different fingerprint. The resource SHOULD copy
+ // this value to MetricConfigRequest.last_known_fingerprint for the next
+ // configuration request. If there are no changes between fingerprint and
+ // MetricConfigRequest.last_known_fingerprint, then all other fields besides
+ // fingerprint in the response are optional, or the same as the last update if
+ // present.
+ //
+ // The exact mechanics of generating the fingerprint is up to the
+ // implementation. However, a fingerprint must be deterministically determined
+ // by the configurations -- the same configuration will generate the same
+ // fingerprint on any instance of an implementation. Hence using a timestamp is
+ // unacceptable, but a deterministic hash is fine.
+ bytes fingerprint = 1;
+
+ // A Schedule is used to apply a particular scheduling configuration to
+ // a metric. If a metric name matches a schedule's patterns, then the metric
+ // adopts the configuration specified by the schedule.
+ message Schedule {
+
+ // A light-weight pattern that can match 1 or more
+ // metrics, for which this schedule will apply. The string is used to
+ // match against metric names. It should not exceed 100k characters.
+ message Pattern {
+ oneof match {
+ string equals = 1; // matches the metric name exactly
+ string starts_with = 2; // prefix-matches the metric name
+ }
+ }
+
+ // Metrics with names that match a rule in the inclusion_patterns are
+ // targeted by this schedule. Metrics that match the exclusion_patterns
+ // are not targeted for this schedule, even if they match an inclusion
+ // pattern.
+ repeated Pattern exclusion_patterns = 1;
+ repeated Pattern inclusion_patterns = 2;
+
+ // Describes the collection period for each metric in seconds.
+ // A period of 0 means to not export.
+ int32 period_sec = 3;
+ }
+
+ // A single metric may match multiple schedules. In such cases, the schedule
+ // that specifies the smallest period is applied.
+ //
+ // Note, for optimization purposes, it is recommended to use as few schedules
+ // as possible to capture all required metric updates. Where you can be
+ // conservative, do take full advantage of the inclusion/exclusion patterns to
+ // capture as much of your targeted metrics.
+ repeated Schedule schedules = 2;
+
+ // Optional. The client is suggested to wait this long (in seconds) before
+ // pinging the configuration service again.
+ int32 suggested_wait_time_sec = 3;
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/metrics/v1/metrics.proto b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/metrics/v1/metrics.proto
new file mode 100644
index 0000000000..f0a76125e9
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/metrics/v1/metrics.proto
@@ -0,0 +1,636 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.metrics.v1;
+
+import "opentelemetry/proto/common/v1/common.proto";
+import "opentelemetry/proto/resource/v1/resource.proto";
+
+option java_multiple_files = true;
+option java_package = "io.opentelemetry.proto.metrics.firehose.v0_7";
+option java_outer_classname = "MetricsProto";
+option go_package = "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1";
+
+// A collection of InstrumentationLibraryMetrics from a Resource.
+message ResourceMetrics {
+ // The resource for the metrics in this message.
+ // If this field is not set then no resource info is known.
+ opentelemetry.proto.resource.v1.Resource resource = 1;
+
+ // A list of metrics that originate from a resource.
+ repeated InstrumentationLibraryMetrics instrumentation_library_metrics = 2;
+}
+
+// A collection of Metrics produced by an InstrumentationLibrary.
+message InstrumentationLibraryMetrics {
+ // The instrumentation library information for the metrics in this message.
+ // Semantically when InstrumentationLibrary isn't set, it is equivalent with
+ // an empty instrumentation library name (unknown).
+ opentelemetry.proto.common.v1.InstrumentationLibrary instrumentation_library = 1;
+
+ // A list of metrics that originate from an instrumentation library.
+ repeated Metric metrics = 2;
+}
+
+// Defines a Metric which has one or more timeseries.
+//
+// The data model and relation between entities is shown in the
+// diagram below. Here, "DataPoint" is the term used to refer to any
+// one of the specific data point value types, and "points" is the term used
+// to refer to any one of the lists of points contained in the Metric.
+//
+// - Metric is composed of a metadata and data.
+// - Metadata part contains a name, description, unit.
+// - Data is one of the possible types (Gauge, Sum, Histogram, etc.).
+// - DataPoint contains timestamps, labels, and one of the possible value type
+// fields.
+//
+// Metric
+// +------------+
+// |name |
+// |description |
+// |unit | +------------------------------------+
+// |data |---> |Gauge, Sum, Histogram, Summary, ... |
+// +------------+ +------------------------------------+
+//
+// Data [One of Gauge, Sum, Histogram, Summary, ...]
+// +-----------+
+// |... | // Metadata about the Data.
+// |points |--+
+// +-----------+ |
+// | +---------------------------+
+// | |DataPoint 1 |
+// v |+------+------+ +------+ |
+// +-----+ ||label |label |...|label | |
+// | 1 |-->||value1|value2|...|valueN| |
+// +-----+ |+------+------+ +------+ |
+// | . | |+-----+ |
+// | . | ||value| |
+// | . | |+-----+ |
+// | . | +---------------------------+
+// | . | .
+// | . | .
+// | . | .
+// | . | +---------------------------+
+// | . | |DataPoint M |
+// +-----+ |+------+------+ +------+ |
+// | M |-->||label |label |...|label | |
+// +-----+ ||value1|value2|...|valueN| |
+// |+------+------+ +------+ |
+// |+-----+ |
+// ||value| |
+// |+-----+ |
+// +---------------------------+
+//
+// All DataPoint types have three common fields:
+// - Labels zero or more key-value pairs associated with the data point.
+// - StartTimeUnixNano MUST be set to the start of the interval when the data's
+// type includes an AggregationTemporality. This field is not set otherwise.
+// - TimeUnixNano MUST be set to:
+// - the moment when an aggregation is reported (independent of the
+// aggregation temporality).
+// - the instantaneous time of the event.
+message Metric {
+ // name of the metric, including its DNS name prefix. It must be unique.
+ string name = 1;
+
+ // description of the metric, which can be used in documentation.
+ string description = 2;
+
+ // unit in which the metric value is reported. Follows the format
+ // described by http://unitsofmeasure.org/ucum.html.
+ string unit = 3;
+
+ // TODO: Decide if support for RawMeasurements (measurements recorded using
+ // the synchronous instruments) is necessary. It can be used to delegate the
+ // aggregation from the application to the agent/collector. See
+ // https://github.com/open-telemetry/opentelemetry-specification/issues/617
+
+ // Data determines the aggregation type (if any) of the metric, what is the
+ // reported value type for the data points, as well as the relatationship to
+ // the time interval over which they are reported.
+ //
+ // TODO: Update table after the decision on:
+ // https://github.com/open-telemetry/opentelemetry-specification/issues/731.
+ // By default, metrics recording using the OpenTelemetry API are exported as
+ // (the table does not include MeasurementValueType to avoid extra rows):
+ //
+ // Instrument Type
+ // ----------------------------------------------
+ // Counter Sum(aggregation_temporality=delta;is_monotonic=true)
+ // UpDownCounter Sum(aggregation_temporality=delta;is_monotonic=false)
+ // ValueRecorder TBD
+ // SumObserver Sum(aggregation_temporality=cumulative;is_monotonic=true)
+ // UpDownSumObserver Sum(aggregation_temporality=cumulative;is_monotonic=false)
+ // ValueObserver Gauge()
+ oneof data {
+ IntGauge int_gauge = 4;
+ DoubleGauge double_gauge = 5;
+ IntSum int_sum = 6;
+ DoubleSum double_sum = 7;
+ IntHistogram int_histogram = 8;
+ DoubleHistogram double_histogram = 9;
+ DoubleSummary double_summary = 11;
+ }
+}
+
+// Gauge represents the type of a int scalar metric that always exports the
+// "current value" for every data point. It should be used for an "unknown"
+// aggregation.
+//
+// A Gauge does not support different aggregation temporalities. Given the
+// aggregation is unknown, points cannot be combined using the same
+// aggregation, regardless of aggregation temporalities. Therefore,
+// AggregationTemporality is not included. Consequently, this also means
+// "StartTimeUnixNano" is ignored for all data points.
+message IntGauge {
+ repeated IntDataPoint data_points = 1;
+}
+
+// Gauge represents the type of a double scalar metric that always exports the
+// "current value" for every data point. It should be used for an "unknown"
+// aggregation.
+//
+// A Gauge does not support different aggregation temporalities. Given the
+// aggregation is unknown, points cannot be combined using the same
+// aggregation, regardless of aggregation temporalities. Therefore,
+// AggregationTemporality is not included. Consequently, this also means
+// "StartTimeUnixNano" is ignored for all data points.
+message DoubleGauge {
+ repeated DoubleDataPoint data_points = 1;
+}
+
+// Sum represents the type of a numeric int scalar metric that is calculated as
+// a sum of all reported measurements over a time interval.
+message IntSum {
+ repeated IntDataPoint data_points = 1;
+
+ // aggregation_temporality describes if the aggregator reports delta changes
+ // since last report time, or cumulative changes since a fixed start time.
+ AggregationTemporality aggregation_temporality = 2;
+
+ // If "true" means that the sum is monotonic.
+ bool is_monotonic = 3;
+}
+
+// Sum represents the type of a numeric double scalar metric that is calculated
+// as a sum of all reported measurements over a time interval.
+message DoubleSum {
+ repeated DoubleDataPoint data_points = 1;
+
+ // aggregation_temporality describes if the aggregator reports delta changes
+ // since last report time, or cumulative changes since a fixed start time.
+ AggregationTemporality aggregation_temporality = 2;
+
+ // If "true" means that the sum is monotonic.
+ bool is_monotonic = 3;
+}
+
+// Represents the type of a metric that is calculated by aggregating as a
+// Histogram of all reported int measurements over a time interval.
+message IntHistogram {
+ repeated IntHistogramDataPoint data_points = 1;
+
+ // aggregation_temporality describes if the aggregator reports delta changes
+ // since last report time, or cumulative changes since a fixed start time.
+ AggregationTemporality aggregation_temporality = 2;
+}
+
+// Represents the type of a metric that is calculated by aggregating as a
+// Histogram of all reported double measurements over a time interval.
+message DoubleHistogram {
+ repeated DoubleHistogramDataPoint data_points = 1;
+
+ // aggregation_temporality describes if the aggregator reports delta changes
+ // since last report time, or cumulative changes since a fixed start time.
+ AggregationTemporality aggregation_temporality = 2;
+}
+
+// DoubleSummary metric data are used to convey quantile summaries,
+// a Prometheus (see: https://prometheus.io/docs/concepts/metric_types/#summary)
+// and OpenMetrics (see: https://github.com/OpenObservability/OpenMetrics/blob/4dbf6075567ab43296eed941037c12951faafb92/protos/prometheus.proto#L45)
+// data type. These data points cannot always be merged in a meaningful way.
+// While they can be useful in some applications, histogram data points are
+// recommended for new applications.
+message DoubleSummary {
+ repeated DoubleSummaryDataPoint data_points = 1;
+}
+
+// AggregationTemporality defines how a metric aggregator reports aggregated
+// values. It describes how those values relate to the time interval over
+// which they are aggregated.
+enum AggregationTemporality {
+ // UNSPECIFIED is the default AggregationTemporality, it MUST not be used.
+ AGGREGATION_TEMPORALITY_UNSPECIFIED = 0;
+
+ // DELTA is an AggregationTemporality for a metric aggregator which reports
+ // changes since last report time. Successive metrics contain aggregation of
+ // values from continuous and non-overlapping intervals.
+ //
+ // The values for a DELTA metric are based only on the time interval
+ // associated with one measurement cycle. There is no dependency on
+ // previous measurements like is the case for CUMULATIVE metrics.
+ //
+ // For example, consider a system measuring the number of requests that
+ // it receives and reports the sum of these requests every second as a
+ // DELTA metric:
+ //
+ // 1. The system starts receiving at time=t_0.
+ // 2. A request is received, the system measures 1 request.
+ // 3. A request is received, the system measures 1 request.
+ // 4. A request is received, the system measures 1 request.
+ // 5. The 1 second collection cycle ends. A metric is exported for the
+ // number of requests received over the interval of time t_0 to
+ // t_0+1 with a value of 3.
+ // 6. A request is received, the system measures 1 request.
+ // 7. A request is received, the system measures 1 request.
+ // 8. The 1 second collection cycle ends. A metric is exported for the
+ // number of requests received over the interval of time t_0+1 to
+ // t_0+2 with a value of 2.
+ AGGREGATION_TEMPORALITY_DELTA = 1;
+
+ // CUMULATIVE is an AggregationTemporality for a metric aggregator which
+ // reports changes since a fixed start time. This means that current values
+ // of a CUMULATIVE metric depend on all previous measurements since the
+ // start time. Because of this, the sender is required to retain this state
+ // in some form. If this state is lost or invalidated, the CUMULATIVE metric
+ // values MUST be reset and a new fixed start time following the last
+ // reported measurement time sent MUST be used.
+ //
+ // For example, consider a system measuring the number of requests that
+ // it receives and reports the sum of these requests every second as a
+ // CUMULATIVE metric:
+ //
+ // 1. The system starts receiving at time=t_0.
+ // 2. A request is received, the system measures 1 request.
+ // 3. A request is received, the system measures 1 request.
+ // 4. A request is received, the system measures 1 request.
+ // 5. The 1 second collection cycle ends. A metric is exported for the
+ // number of requests received over the interval of time t_0 to
+ // t_0+1 with a value of 3.
+ // 6. A request is received, the system measures 1 request.
+ // 7. A request is received, the system measures 1 request.
+ // 8. The 1 second collection cycle ends. A metric is exported for the
+ // number of requests received over the interval of time t_0 to
+ // t_0+2 with a value of 5.
+ // 9. The system experiences a fault and loses state.
+ // 10. The system recovers and resumes receiving at time=t_1.
+ // 11. A request is received, the system measures 1 request.
+ // 12. The 1 second collection cycle ends. A metric is exported for the
+ // number of requests received over the interval of time t_1 to
+ // t_0+1 with a value of 1.
+ //
+ // Note: Even though, when reporting changes since last report time, using
+ // CUMULATIVE is valid, it is not recommended. This may cause problems for
+ // systems that do not use start_time to determine when the aggregation
+ // value was reset (e.g. Prometheus).
+ AGGREGATION_TEMPORALITY_CUMULATIVE = 2;
+}
+
+// IntDataPoint is a single data point in a timeseries that describes the
+// time-varying values of a int64 metric.
+message IntDataPoint {
+ // The set of labels that uniquely identify this timeseries.
+ repeated opentelemetry.proto.common.v1.StringKeyValue labels = 1;
+
+ // start_time_unix_nano is the last time when the aggregation value was reset
+ // to "zero". For some metric types this is ignored, see data types for more
+ // details.
+ //
+ // The aggregation value is over the time interval (start_time_unix_nano,
+ // time_unix_nano].
+ //
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+ // 1970.
+ //
+ // Value of 0 indicates that the timestamp is unspecified. In that case the
+ // timestamp may be decided by the backend.
+ fixed64 start_time_unix_nano = 2;
+
+ // time_unix_nano is the moment when this aggregation value was reported.
+ //
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+ // 1970.
+ fixed64 time_unix_nano = 3;
+
+ // value itself.
+ sfixed64 value = 4;
+
+ // (Optional) List of exemplars collected from
+ // measurements that were used to form the data point
+ repeated IntExemplar exemplars = 5;
+}
+
+// DoubleDataPoint is a single data point in a timeseries that describes the
+// time-varying value of a double metric.
+message DoubleDataPoint {
+ // The set of labels that uniquely identify this timeseries.
+ repeated opentelemetry.proto.common.v1.StringKeyValue labels = 1;
+
+ // start_time_unix_nano is the last time when the aggregation value was reset
+ // to "zero". For some metric types this is ignored, see data types for more
+ // details.
+ //
+ // The aggregation value is over the time interval (start_time_unix_nano,
+ // time_unix_nano].
+ //
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+ // 1970.
+ //
+ // Value of 0 indicates that the timestamp is unspecified. In that case the
+ // timestamp may be decided by the backend.
+ fixed64 start_time_unix_nano = 2;
+
+ // time_unix_nano is the moment when this aggregation value was reported.
+ //
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+ // 1970.
+ fixed64 time_unix_nano = 3;
+
+ // value itself.
+ double value = 4;
+
+ // (Optional) List of exemplars collected from
+ // measurements that were used to form the data point
+ repeated DoubleExemplar exemplars = 5;
+}
+
+// IntHistogramDataPoint is a single data point in a timeseries that describes
+// the time-varying values of a Histogram of int values. A Histogram contains
+// summary statistics for a population of values, it may optionally contain
+// the distribution of those values across a set of buckets.
+message IntHistogramDataPoint {
+ // The set of labels that uniquely identify this timeseries.
+ repeated opentelemetry.proto.common.v1.StringKeyValue labels = 1;
+
+ // start_time_unix_nano is the last time when the aggregation value was reset
+ // to "zero". For some metric types this is ignored, see data types for more
+ // details.
+ //
+ // The aggregation value is over the time interval (start_time_unix_nano,
+ // time_unix_nano].
+ //
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+ // 1970.
+ //
+ // Value of 0 indicates that the timestamp is unspecified. In that case the
+ // timestamp may be decided by the backend.
+ fixed64 start_time_unix_nano = 2;
+
+ // time_unix_nano is the moment when this aggregation value was reported.
+ //
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+ // 1970.
+ fixed64 time_unix_nano = 3;
+
+ // count is the number of values in the population. Must be non-negative. This
+ // value must be equal to the sum of the "count" fields in buckets if a
+ // histogram is provided.
+ fixed64 count = 4;
+
+ // sum of the values in the population. If count is zero then this field
+ // must be zero. This value must be equal to the sum of the "sum" fields in
+ // buckets if a histogram is provided.
+ sfixed64 sum = 5;
+
+ // bucket_counts is an optional field contains the count values of histogram
+ // for each bucket.
+ //
+ // The sum of the bucket_counts must equal the value in the count field.
+ //
+ // The number of elements in bucket_counts array must be by one greater than
+ // the number of elements in explicit_bounds array.
+ repeated fixed64 bucket_counts = 6;
+
+ // A histogram may optionally contain the distribution of the values in the population.
+ // In that case one of the option fields below and "buckets" field both must be defined.
+ // Otherwise all option fields and "buckets" field must be omitted in which case the
+ // distribution of values in the histogram is unknown and only the total count and sum are known.
+
+ // explicit_bounds is the only supported bucket option currently.
+ // TODO: Add more bucket options.
+
+ // explicit_bounds specifies buckets with explicitly defined bounds for values.
+ // The bucket boundaries are described by "bounds" field.
+ //
+ // This defines size(bounds) + 1 (= N) buckets. The boundaries for bucket
+ // at index i are:
+ //
+ // (-infinity, bounds[i]) for i == 0
+ // [bounds[i-1], bounds[i]) for 0 < i < N-1
+ // [bounds[i], +infinity) for i == N-1
+ // The values in bounds array must be strictly increasing.
+ //
+ // Note: only [a, b) intervals are currently supported for each bucket except the first one.
+ // If we decide to also support (a, b] intervals we should add support for these by defining
+ // a boolean value which decides what type of intervals to use.
+ repeated double explicit_bounds = 7;
+
+ // (Optional) List of exemplars collected from
+ // measurements that were used to form the data point
+ repeated IntExemplar exemplars = 8;
+}
+
+// HistogramDataPoint is a single data point in a timeseries that describes the
+// time-varying values of a Histogram of double values. A Histogram contains
+// summary statistics for a population of values, it may optionally contain the
+// distribution of those values across a set of buckets.
+message DoubleHistogramDataPoint {
+ // The set of labels that uniquely identify this timeseries.
+ repeated opentelemetry.proto.common.v1.StringKeyValue labels = 1;
+
+ // start_time_unix_nano is the last time when the aggregation value was reset
+ // to "zero". For some metric types this is ignored, see data types for more
+ // details.
+ //
+ // The aggregation value is over the time interval (start_time_unix_nano,
+ // time_unix_nano].
+ //
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+ // 1970.
+ //
+ // Value of 0 indicates that the timestamp is unspecified. In that case the
+ // timestamp may be decided by the backend.
+ fixed64 start_time_unix_nano = 2;
+
+ // time_unix_nano is the moment when this aggregation value was reported.
+ //
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+ // 1970.
+ fixed64 time_unix_nano = 3;
+
+ // count is the number of values in the population. Must be non-negative. This
+ // value must be equal to the sum of the "count" fields in buckets if a
+ // histogram is provided.
+ fixed64 count = 4;
+
+ // sum of the values in the population. If count is zero then this field
+ // must be zero. This value must be equal to the sum of the "sum" fields in
+ // buckets if a histogram is provided.
+ double sum = 5;
+
+ // bucket_counts is an optional field contains the count values of histogram
+ // for each bucket.
+ //
+ // The sum of the bucket_counts must equal the value in the count field.
+ //
+ // The number of elements in bucket_counts array must be by one greater than
+ // the number of elements in explicit_bounds array.
+ repeated fixed64 bucket_counts = 6;
+
+ // A histogram may optionally contain the distribution of the values in the population.
+ // In that case one of the option fields below and "buckets" field both must be defined.
+ // Otherwise all option fields and "buckets" field must be omitted in which case the
+ // distribution of values in the histogram is unknown and only the total count and sum are known.
+
+ // explicit_bounds is the only supported bucket option currently.
+ // TODO: Add more bucket options.
+
+ // explicit_bounds specifies buckets with explicitly defined bounds for values.
+ // The bucket boundaries are described by "bounds" field.
+ //
+ // This defines size(bounds) + 1 (= N) buckets. The boundaries for bucket
+ // at index i are:
+ //
+ // (-infinity, bounds[i]) for i == 0
+ // [bounds[i-1], bounds[i]) for 0 < i < N-1
+ // [bounds[i], +infinity) for i == N-1
+ // The values in bounds array must be strictly increasing.
+ //
+ // Note: only [a, b) intervals are currently supported for each bucket except the first one.
+ // If we decide to also support (a, b] intervals we should add support for these by defining
+ // a boolean value which decides what type of intervals to use.
+ repeated double explicit_bounds = 7;
+
+ // (Optional) List of exemplars collected from
+ // measurements that were used to form the data point
+ repeated DoubleExemplar exemplars = 8;
+}
+
+// DoubleSummaryDataPoint is a single data point in a timeseries that describes the
+// time-varying values of a Summary metric.
+message DoubleSummaryDataPoint {
+ // The set of labels that uniquely identify this timeseries.
+ repeated opentelemetry.proto.common.v1.StringKeyValue labels = 1;
+
+ // start_time_unix_nano is the last time when the aggregation value was reset
+ // to "zero". For some metric types this is ignored, see data types for more
+ // details.
+ //
+ // The aggregation value is over the time interval (start_time_unix_nano,
+ // time_unix_nano].
+ //
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+ // 1970.
+ //
+ // Value of 0 indicates that the timestamp is unspecified. In that case the
+ // timestamp may be decided by the backend.
+ fixed64 start_time_unix_nano = 2;
+
+ // time_unix_nano is the moment when this aggregation value was reported.
+ //
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+ // 1970.
+ fixed64 time_unix_nano = 3;
+
+ // count is the number of values in the population. Must be non-negative.
+ fixed64 count = 4;
+
+ // sum of the values in the population. If count is zero then this field
+ // must be zero.
+ double sum = 5;
+
+ // Represents the value at a given quantile of a distribution.
+ //
+ // To record Min and Max values following conventions are used:
+ // - The 1.0 quantile is equivalent to the maximum value observed.
+ // - The 0.0 quantile is equivalent to the minimum value observed.
+ //
+ // See the following issue for more context:
+ // https://github.com/open-telemetry/opentelemetry-proto/issues/125
+ message ValueAtQuantile {
+ // The quantile of a distribution. Must be in the interval
+ // [0.0, 1.0].
+ double quantile = 1;
+
+ // The value at the given quantile of a distribution.
+ double value = 2;
+ }
+
+ // (Optional) list of values at different quantiles of the distribution calculated
+ // from the current snapshot. The quantiles must be strictly increasing.
+ repeated ValueAtQuantile quantile_values = 6;
+}
+
+// A representation of an exemplar, which is a sample input int measurement.
+// Exemplars also hold information about the environment when the measurement
+// was recorded, for example the span and trace ID of the active span when the
+// exemplar was recorded.
+message IntExemplar {
+ // The set of labels that were filtered out by the aggregator, but recorded
+ // alongside the original measurement. Only labels that were filtered out
+ // by the aggregator should be included
+ repeated opentelemetry.proto.common.v1.StringKeyValue filtered_labels = 1;
+
+ // time_unix_nano is the exact time when this exemplar was recorded
+ //
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+ // 1970.
+ fixed64 time_unix_nano = 2;
+
+ // Numerical int value of the measurement that was recorded.
+ sfixed64 value = 3;
+
+ // (Optional) Span ID of the exemplar trace.
+ // span_id may be missing if the measurement is not recorded inside a trace
+ // or if the trace is not sampled.
+ bytes span_id = 4;
+
+ // (Optional) Trace ID of the exemplar trace.
+ // trace_id may be missing if the measurement is not recorded inside a trace
+ // or if the trace is not sampled.
+ bytes trace_id = 5;
+}
+
+// A representation of an exemplar, which is a sample input double measurement.
+// Exemplars also hold information about the environment when the measurement
+// was recorded, for example the span and trace ID of the active span when the
+// exemplar was recorded.
+message DoubleExemplar {
+ // The set of labels that were filtered out by the aggregator, but recorded
+ // alongside the original measurement. Only labels that were filtered out
+ // by the aggregator should be included
+ repeated opentelemetry.proto.common.v1.StringKeyValue filtered_labels = 1;
+
+ // time_unix_nano is the exact time when this exemplar was recorded
+ //
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+ // 1970.
+ fixed64 time_unix_nano = 2;
+
+ // Numerical double value of the measurement that was recorded.
+ double value = 3;
+
+ // (Optional) Span ID of the exemplar trace.
+ // span_id may be missing if the measurement is not recorded inside a trace
+ // or if the trace is not sampled.
+ bytes span_id = 4;
+
+ // (Optional) Trace ID of the exemplar trace.
+ // trace_id may be missing if the measurement is not recorded inside a trace
+ // or if the trace is not sampled.
+ bytes trace_id = 5;
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/resource/v1/resource.proto b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/resource/v1/resource.proto
new file mode 100644
index 0000000000..88a735a746
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/resource/v1/resource.proto
@@ -0,0 +1,34 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.resource.v1;
+
+import "opentelemetry/proto/common/v1/common.proto";
+
+option java_multiple_files = true;
+option java_package = "io.opentelemetry.proto.resource.firehose.v0_7";
+option java_outer_classname = "ResourceProto";
+option go_package = "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1";
+
+// Resource information.
+message Resource {
+ // Set of labels that describe the resource.
+ repeated opentelemetry.proto.common.v1.KeyValue attributes = 1;
+
+ // dropped_attributes_count is the number of dropped attributes. If the value is 0, then
+ // no attributes were dropped.
+ uint32 dropped_attributes_count = 2;
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/java/org/apache/skywalking/oap/server/receiver/aws/firehose/OtelMetricsConvertorTest.java b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/java/org/apache/skywalking/oap/server/receiver/aws/firehose/OtelMetricsConvertorTest.java
new file mode 100644
index 0000000000..1a97e9c9aa
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/java/org/apache/skywalking/oap/server/receiver/aws/firehose/OtelMetricsConvertorTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.receiver.aws.firehose;
+
+import com.google.gson.Gson;
+import com.google.protobuf.util.JsonFormat;
+import io.opentelemetry.proto.collector.metrics.firehose.v0_7.ExportMetricsServiceRequest;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OtelMetricsConvertorTest {
+
+ @Test
+ public void test() throws IOException {
+ for (TestData testData : findTestData()) {
+ io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest request = convertSource(
+ testData.getSourceFile());
+ String str = JsonFormat.printer().print(request);
+ final Map convertedData = new Gson().fromJson(str, Map.class);
+ final Map expect = new Gson().fromJson(
+ new String(Files.readAllBytes(testData.getExpectFile().toPath())), Map.class);
+ Assert.assertEquals(
+ String.format("diff , %s -> %s", testData.getSourceFile(), testData.getExpectFile()),
+ expect,
+ convertedData
+ );
+ System.out.printf("test pass %s -> %s %n", testData.getSourceFile(), testData.getExpectFile());
+ }
+ }
+
+ private io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest convertSource(final File sourceFile) throws IOException {
+ String source = new String(Files.readAllBytes(sourceFile.toPath()));
+ final ExportMetricsServiceRequest.Builder builder = ExportMetricsServiceRequest.newBuilder();
+ JsonFormat.parser().merge(source, builder);
+ return OtelMetricsConvertor.convertExportMetricsRequest(
+ builder.build());
+ }
+
+ private List<TestData> findTestData() {
+ List<TestData> res = new ArrayList<>();
+ Path resourceDirectory = Paths.get("src", "test", "resources", "convertor-test-data");
+ final File[] subFiles = resourceDirectory.toFile().listFiles(File::isDirectory);
+ if (subFiles == null) {
+ return res;
+ }
+ for (File subFile : subFiles) {
+ File sourceFile = new File(subFile.getAbsolutePath(), "source.json");
+ File expectFile = new File(subFile.getAbsolutePath(), "expect.json");
+ res.add(new TestData(sourceFile, expectFile));
+ }
+ return res;
+ }
+
+ @Getter
+ @Setter
+ @AllArgsConstructor
+ private static class TestData {
+ // OTEL 0.7.0
+ private File sourceFile;
+ private File expectFile;
+ }
+
+}
+
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-1/expect.json b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-1/expect.json
new file mode 100644
index 0000000000..7d97bca7bf
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-1/expect.json
@@ -0,0 +1,132 @@
+{
+ "resourceMetrics": [
+ {
+ "resource": {
+ "attributes": [
+ {
+ "key": "cloud.provider",
+ "value": {
+ "stringValue": "aws"
+ }
+ },
+ {
+ "key": "cloud.account.id",
+ "value": {
+ "stringValue": "xxxxxxxx"
+ }
+ },
+ {
+ "key": "cloud.region",
+ "value": {
+ "stringValue": "ap-northeast-1"
+ }
+ },
+ {
+ "key": "aws.exporter.arn",
+ "value": {
+ "stringValue": "arn:aws:cloudwatch:ap-northeast-1:xxxxxxxx:metric-stream/CustomFull-CmHV2P"
+ }
+ }
+ ]
+ },
+ "scopeMetrics": [
+ {
+ "metrics": [
+ {
+ "name": "amazonaws.com/AWS/S3/4xxErrors",
+ "unit": "{Count}",
+ "summary": {
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "1674547500000000000",
+ "timeUnixNano": "1674547560000000000",
+ "count": "1",
+ "quantileValues": [
+ {
+ },
+ {
+ "quantile": 1.0
+ }
+ ],
+ "attributes": [
+ {
+ "key": "Namespace",
+ "value": {
+ "stringValue": "AWS/S3"
+ }
+ },
+ {
+ "key": "MetricName",
+ "value": {
+ "stringValue": "4xxErrors"
+ }
+ },
+ {
+ "key": "BucketName",
+ "value": {
+ "stringValue": "skywalking"
+ }
+ },
+ {
+ "key": "FilterId",
+ "value": {
+ "stringValue": "test"
+ }
+ }
+ ]
+ }
+ ]
+ }
+ },
+ {
+ "name": "amazonaws.com/AWS/S3/5xxErrors",
+ "unit": "{Count}",
+ "summary": {
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "1674547500000000000",
+ "timeUnixNano": "1674547560000000000",
+ "count": "1",
+ "quantileValues": [
+ {
+ },
+ {
+ "quantile": 1.0
+ }
+ ],
+ "attributes": [
+ {
+ "key": "Namespace",
+ "value": {
+ "stringValue": "AWS/S3"
+ }
+ },
+ {
+ "key": "MetricName",
+ "value": {
+ "stringValue": "5xxErrors"
+ }
+ },
+ {
+ "key": "BucketName",
+ "value": {
+ "stringValue": "skywalking"
+ }
+ },
+ {
+ "key": "FilterId",
+ "value": {
+ "stringValue": "test"
+ }
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-1/source.json b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-1/source.json
new file mode 100644
index 0000000000..4a454e6ba7
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-1/source.json
@@ -0,0 +1,116 @@
+{
+ "resourceMetrics": [
+ {
+ "resource": {
+ "attributes": [
+ {
+ "key": "cloud.provider",
+ "value": {
+ "stringValue": "aws"
+ }
+ },
+ {
+ "key": "cloud.account.id",
+ "value": {
+ "stringValue": "xxxxxxxx"
+ }
+ },
+ {
+ "key": "cloud.region",
+ "value": {
+ "stringValue": "ap-northeast-1"
+ }
+ },
+ {
+ "key": "aws.exporter.arn",
+ "value": {
+ "stringValue": "arn:aws:cloudwatch:ap-northeast-1:xxxxxxxx:metric-stream/CustomFull-CmHV2P"
+ }
+ }
+ ]
+ },
+ "instrumentationLibraryMetrics": [
+ {
+ "metrics": [
+ {
+ "name": "amazonaws.com/AWS/S3/4xxErrors",
+ "unit": "{Count}",
+ "doubleSummary": {
+ "dataPoints": [
+ {
+ "labels": [
+ {
+ "key": "Namespace",
+ "value": "AWS/S3"
+ },
+ {
+ "key": "MetricName",
+ "value": "4xxErrors"
+ },
+ {
+ "key": "BucketName",
+ "value": "skywalking"
+ },
+ {
+ "key": "FilterId",
+ "value": "test"
+ }
+ ],
+ "startTimeUnixNano": "1674547500000000000",
+ "timeUnixNano": "1674547560000000000",
+ "count": "1",
+ "quantileValues": [
+ {
+ },
+ {
+ "quantile": 1.0
+ }
+ ]
+ }
+ ]
+ }
+ },
+ {
+ "name": "amazonaws.com/AWS/S3/5xxErrors",
+ "unit": "{Count}",
+ "doubleSummary": {
+ "dataPoints": [
+ {
+ "labels": [
+ {
+ "key": "Namespace",
+ "value": "AWS/S3"
+ },
+ {
+ "key": "MetricName",
+ "value": "5xxErrors"
+ },
+ {
+ "key": "BucketName",
+ "value": "skywalking"
+ },
+ {
+ "key": "FilterId",
+ "value": "test"
+ }
+ ],
+ "startTimeUnixNano": "1674547500000000000",
+ "timeUnixNano": "1674547560000000000",
+ "count": "1",
+ "quantileValues": [
+ {
+ },
+ {
+ "quantile": 1.0
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-2/expect.json b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-2/expect.json
new file mode 100644
index 0000000000..cddc0dae9b
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-2/expect.json
@@ -0,0 +1,285 @@
+{
+ "resourceMetrics": [
+ {
+ "resource": {
+ "attributes": [
+ {
+ "key": "cloud.provider",
+ "value": {
+ "stringValue": "aws"
+ }
+ },
+ {
+ "key": "cloud.account.id",
+ "value": {
+ "stringValue": "xxxxxxxx"
+ }
+ },
+ {
+ "key": "cloud.region",
+ "value": {
+ "stringValue": "ap-northeast-1"
+ }
+ },
+ {
+ "key": "aws.exporter.arn",
+ "value": {
+ "stringValue": "arn:aws:cloudwatch:ap-northeast-1:xxxxxxxx:metric-stream/CustomFull-CmHV2P"
+ }
+ }
+ ]
+ },
+ "scopeMetrics": [
+ {
+ "metrics": [
+ {
+ "name": "amazonaws.com/AWS/S3/BytesDownloaded",
+ "unit": "By",
+ "summary": {
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "1674547500000000000",
+ "timeUnixNano": "1674547560000000000",
+ "count": "1",
+ "sum": 166199.0,
+ "quantileValues": [
+ {
+ "value": 166199.0
+ },
+ {
+ "quantile": 1.0,
+ "value": 166199.0
+ }
+ ],
+ "attributes": [
+ {
+ "key": "Namespace",
+ "value": {
+ "stringValue": "AWS/S3"
+ }
+ },
+ {
+ "key": "MetricName",
+ "value": {
+ "stringValue": "BytesDownloaded"
+ }
+ },
+ {
+ "key": "BucketName",
+ "value": {
+ "stringValue": "skywalking"
+ }
+ },
+ {
+ "key": "FilterId",
+ "value": {
+ "stringValue": "test"
+ }
+ }
+ ]
+ }
+ ]
+ }
+ },
+ {
+ "name": "amazonaws.com/AWS/S3/TotalRequestLatency",
+ "unit": "ms",
+ "summary": {
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "1674547500000000000",
+ "timeUnixNano": "1674547560000000000",
+ "count": "1",
+ "sum": 66.0,
+ "quantileValues": [
+ {
+ "value": 66.0
+ },
+ {
+ "quantile": 1.0,
+ "value": 66.0
+ }
+ ],
+ "attributes": [
+ {
+ "key": "Namespace",
+ "value": {
+ "stringValue": "AWS/S3"
+ }
+ },
+ {
+ "key": "MetricName",
+ "value": {
+ "stringValue": "TotalRequestLatency"
+ }
+ },
+ {
+ "key": "BucketName",
+ "value": {
+ "stringValue": "skywalking"
+ }
+ },
+ {
+ "key": "FilterId",
+ "value": {
+ "stringValue": "test"
+ }
+ }
+ ]
+ }
+ ]
+ }
+ },
+ {
+ "name": "amazonaws.com/AWS/S3/FirstByteLatency",
+ "unit": "ms",
+ "summary": {
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "1674547500000000000",
+ "timeUnixNano": "1674547560000000000",
+ "count": "1",
+ "sum": 64.0,
+ "quantileValues": [
+ {
+ "value": 64.0
+ },
+ {
+ "quantile": 1.0,
+ "value": 64.0
+ }
+ ],
+ "attributes": [
+ {
+ "key": "Namespace",
+ "value": {
+ "stringValue": "AWS/S3"
+ }
+ },
+ {
+ "key": "MetricName",
+ "value": {
+ "stringValue": "FirstByteLatency"
+ }
+ },
+ {
+ "key": "BucketName",
+ "value": {
+ "stringValue": "skywalking"
+ }
+ },
+ {
+ "key": "FilterId",
+ "value": {
+ "stringValue": "test"
+ }
+ }
+ ]
+ }
+ ]
+ }
+ },
+ {
+ "name": "amazonaws.com/AWS/S3/GetRequests",
+ "unit": "{Count}",
+ "summary": {
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "1674547500000000000",
+ "timeUnixNano": "1674547560000000000",
+ "count": "1",
+ "sum": 1.0,
+ "quantileValues": [
+ {
+ "value": 1.0
+ },
+ {
+ "quantile": 1.0,
+ "value": 1.0
+ }
+ ],
+ "attributes": [
+ {
+ "key": "Namespace",
+ "value": {
+ "stringValue": "AWS/S3"
+ }
+ },
+ {
+ "key": "MetricName",
+ "value": {
+ "stringValue": "GetRequests"
+ }
+ },
+ {
+ "key": "BucketName",
+ "value": {
+ "stringValue": "skywalking"
+ }
+ },
+ {
+ "key": "FilterId",
+ "value": {
+ "stringValue": "test"
+ }
+ }
+ ]
+ }
+ ]
+ }
+ },
+ {
+ "name": "amazonaws.com/AWS/S3/AllRequests",
+ "unit": "{Count}",
+ "summary": {
+ "dataPoints": [
+ {
+ "startTimeUnixNano": "1674547500000000000",
+ "timeUnixNano": "1674547560000000000",
+ "count": "1",
+ "sum": 1.0,
+ "quantileValues": [
+ {
+ "value": 1.0
+ },
+ {
+ "quantile": 1.0,
+ "value": 1.0
+ }
+ ],
+ "attributes": [
+ {
+ "key": "Namespace",
+ "value": {
+ "stringValue": "AWS/S3"
+ }
+ },
+ {
+ "key": "MetricName",
+ "value": {
+ "stringValue": "AllRequests"
+ }
+ },
+ {
+ "key": "BucketName",
+ "value": {
+ "stringValue": "skywalking"
+ }
+ },
+ {
+ "key": "FilterId",
+ "value": {
+ "stringValue": "test"
+ }
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-2/source.json b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-2/source.json
new file mode 100644
index 0000000000..acaae8a04d
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-2/source.json
@@ -0,0 +1,245 @@
+{
+ "resourceMetrics": [
+ {
+ "resource": {
+ "attributes": [
+ {
+ "key": "cloud.provider",
+ "value": {
+ "stringValue": "aws"
+ }
+ },
+ {
+ "key": "cloud.account.id",
+ "value": {
+ "stringValue": "xxxxxxxx"
+ }
+ },
+ {
+ "key": "cloud.region",
+ "value": {
+ "stringValue": "ap-northeast-1"
+ }
+ },
+ {
+ "key": "aws.exporter.arn",
+ "value": {
+ "stringValue": "arn:aws:cloudwatch:ap-northeast-1:xxxxxxxx:metric-stream/CustomFull-CmHV2P"
+ }
+ }
+ ]
+ },
+ "instrumentationLibraryMetrics": [
+ {
+ "metrics": [
+ {
+ "name": "amazonaws.com/AWS/S3/BytesDownloaded",
+ "unit": "By",
+ "doubleSummary": {
+ "dataPoints": [
+ {
+ "labels": [
+ {
+ "key": "Namespace",
+ "value": "AWS/S3"
+ },
+ {
+ "key": "MetricName",
+ "value": "BytesDownloaded"
+ },
+ {
+ "key": "BucketName",
+ "value": "skywalking"
+ },
+ {
+ "key": "FilterId",
+ "value": "test"
+ }
+ ],
+ "startTimeUnixNano": "1674547500000000000",
+ "timeUnixNano": "1674547560000000000",
+ "count": "1",
+ "sum": 166199.0,
+ "quantileValues": [
+ {
+ "value": 166199.0
+ },
+ {
+ "quantile": 1.0,
+ "value": 166199.0
+ }
+ ]
+ }
+ ]
+ }
+ },
+ {
+ "name": "amazonaws.com/AWS/S3/TotalRequestLatency",
+ "unit": "ms",
+ "doubleSummary": {
+ "dataPoints": [
+ {
+ "labels": [
+ {
+ "key": "Namespace",
+ "value": "AWS/S3"
+ },
+ {
+ "key": "MetricName",
+ "value": "TotalRequestLatency"
+ },
+ {
+ "key": "BucketName",
+ "value": "skywalking"
+ },
+ {
+ "key": "FilterId",
+ "value": "test"
+ }
+ ],
+ "startTimeUnixNano": "1674547500000000000",
+ "timeUnixNano": "1674547560000000000",
+ "count": "1",
+ "sum": 66.0,
+ "quantileValues": [
+ {
+ "value": 66.0
+ },
+ {
+ "quantile": 1.0,
+ "value": 66.0
+ }
+ ]
+ }
+ ]
+ }
+ },
+ {
+ "name": "amazonaws.com/AWS/S3/FirstByteLatency",
+ "unit": "ms",
+ "doubleSummary": {
+ "dataPoints": [
+ {
+ "labels": [
+ {
+ "key": "Namespace",
+ "value": "AWS/S3"
+ },
+ {
+ "key": "MetricName",
+ "value": "FirstByteLatency"
+ },
+ {
+ "key": "BucketName",
+ "value": "skywalking"
+ },
+ {
+ "key": "FilterId",
+ "value": "test"
+ }
+ ],
+ "startTimeUnixNano": "1674547500000000000",
+ "timeUnixNano": "1674547560000000000",
+ "count": "1",
+ "sum": 64.0,
+ "quantileValues": [
+ {
+ "value": 64.0
+ },
+ {
+ "quantile": 1.0,
+ "value": 64.0
+ }
+ ]
+ }
+ ]
+ }
+ },
+ {
+ "name": "amazonaws.com/AWS/S3/GetRequests",
+ "unit": "{Count}",
+ "doubleSummary": {
+ "dataPoints": [
+ {
+ "labels": [
+ {
+ "key": "Namespace",
+ "value": "AWS/S3"
+ },
+ {
+ "key": "MetricName",
+ "value": "GetRequests"
+ },
+ {
+ "key": "BucketName",
+ "value": "skywalking"
+ },
+ {
+ "key": "FilterId",
+ "value": "test"
+ }
+ ],
+ "startTimeUnixNano": "1674547500000000000",
+ "timeUnixNano": "1674547560000000000",
+ "count": "1",
+ "sum": 1.0,
+ "quantileValues": [
+ {
+ "value": 1.0
+ },
+ {
+ "quantile": 1.0,
+ "value": 1.0
+ }
+ ]
+ }
+ ]
+ }
+ },
+ {
+ "name": "amazonaws.com/AWS/S3/AllRequests",
+ "unit": "{Count}",
+ "doubleSummary": {
+ "dataPoints": [
+ {
+ "labels": [
+ {
+ "key": "Namespace",
+ "value": "AWS/S3"
+ },
+ {
+ "key": "MetricName",
+ "value": "AllRequests"
+ },
+ {
+ "key": "BucketName",
+ "value": "skywalking"
+ },
+ {
+ "key": "FilterId",
+ "value": "test"
+ }
+ ],
+ "startTimeUnixNano": "1674547500000000000",
+ "timeUnixNano": "1674547560000000000",
+ "count": "1",
+ "sum": 1.0,
+ "quantileValues": [
+ {
+ "value": 1.0
+ },
+ {
+ "quantile": 1.0,
+ "value": 1.0
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}