You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/02/05 09:47:56 UTC

[skywalking] branch master updated: Polish aws-firehose-receiver to adapt existing OTEL proto (#10343)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new b991cfd125 Polish aws-firehose-receiver to adapt existing OTEL proto (#10343)
b991cfd125 is described below

commit b991cfd125da0a2597cf299a38e40b4fa5ec16d3
Author: pg.yang <pg...@hotmail.com>
AuthorDate: Sun Feb 5 17:47:48 2023 +0800

    Polish aws-firehose-receiver to adapt existing OTEL proto (#10343)
---
 LICENSE                                            |   1 +
 docs/en/setup/backend/aws-firehose-receiver.md     |   2 +-
 .../aws-firehose-receiver/pom.xml                  |  31 +
 .../receiver/aws/firehose/FirehoseHTTPHandler.java |   6 +-
 .../aws/firehose/OtelMetricsConvertor.java         | 344 +++++++++++
 .../proto/opentelemetry/proto/collector/README.md  |   9 +
 .../collector/metrics/v1/metrics_service.proto     |  45 ++
 .../collector/metrics/v1/metrics_service_http.yaml |   9 +
 .../opentelemetry/proto/common/v1/common.proto     |  78 +++
 .../proto/metrics/experimental/configservice.proto | 102 ++++
 .../opentelemetry/proto/metrics/v1/metrics.proto   | 636 +++++++++++++++++++++
 .../opentelemetry/proto/resource/v1/resource.proto |  34 ++
 .../aws/firehose/OtelMetricsConvertorTest.java     |  90 +++
 .../convertor-test-data/s3-data-1/expect.json      | 132 +++++
 .../convertor-test-data/s3-data-1/source.json      | 116 ++++
 .../convertor-test-data/s3-data-2/expect.json      | 285 +++++++++
 .../convertor-test-data/s3-data-2/source.json      | 245 ++++++++
 17 files changed, 2161 insertions(+), 4 deletions(-)

diff --git a/LICENSE b/LICENSE
index b073d89776..1429067582 100644
--- a/LICENSE
+++ b/LICENSE
@@ -220,6 +220,7 @@ The text of each license is the standard Apache 2.0 license.
    proto files from opencensus: https://github.com/census-instrumentation/opencensus-proto/tree/master/gen-go  Apache 2.0
    proto files from prometheus/client_model: https://github.com/prometheus/client_model Apache 2.0
    proto files from opentelemetry: https://github.com/open-telemetry/opentelemetry-proto/tree/main/opentelemetry/proto Apache 2.0
+   proto files from opentelemetry: https://github.com/open-telemetry/opentelemetry-proto/tree/v0.7.0 Apache 2.0
    flatbuffers files from istio/proxy: https://github.com/istio/proxy Apache 2.0
    mvnw files from https://github.com/apache/maven-wrapper Apache 2.0
    svg files from skywalking-ui/src/assets/icons: https://github.com/google/material-design-icons Apache 2.0
diff --git a/docs/en/setup/backend/aws-firehose-receiver.md b/docs/en/setup/backend/aws-firehose-receiver.md
index 43c27df277..e422c46f4d 100644
--- a/docs/en/setup/backend/aws-firehose-receiver.md
+++ b/docs/en/setup/backend/aws-firehose-receiver.md
@@ -5,7 +5,7 @@ You could leverage the receiver to collect [AWS CloudWatch metrics](https://docs
 
 ## Setup(S3 example)
 
-1. Create CloudWatch metrics configuration for S3 (refer to [S3 CloudWatch metrics](https://docs.aws.amazon.com/AmazonS3/latest/userguide/configure-request-metrics-bucket.html)) 
+1. Create CloudWatch metrics configuration for S3 (refer to [S3 CloudWatch metrics](https://docs.aws.amazon.com/AmazonS3/latest/userguide/configure-request-metrics-bucket.html))
 2. Stream CloudWatch metrics to AWS Kinesis Data Firehose delivery stream by [CloudWatch metrics stream](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-setup-datalake.html)
 3. Specify AWS Kinesis Data Firehose delivery stream HTTP Endpoint (refer to [Choose HTTP Endpoint for Your Destination](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http))
 
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/pom.xml b/oap-server/server-receiver-plugin/aws-firehose-receiver/pom.xml
index 1b86b698f9..03b9335a2a 100644
--- a/oap-server/server-receiver-plugin/aws-firehose-receiver/pom.xml
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/pom.xml
@@ -38,4 +38,35 @@
             <version>${project.version}</version>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>${protobuf-maven-plugin.version}</version>
+                <configuration>
+                    <!--
+                      The version of protoc must match protobuf-java. If you don't depend on
+                      protobuf-java directly, you will be transitively depending on the
+                      protobuf-java version that grpc depends on.
+                    -->
+                    <protocArtifact>com.google.protobuf:protoc:3.19.2:exe:${os.detected.classifier}
+                    </protocArtifact>
+                    <pluginId>grpc-java</pluginId>
+                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.42.1:exe:${os.detected.classifier}
+                    </pluginArtifact>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>compile-custom</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseHTTPHandler.java b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseHTTPHandler.java
index 760630dd06..9a11018a94 100644
--- a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseHTTPHandler.java
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseHTTPHandler.java
@@ -23,7 +23,7 @@ import com.linecorp.armeria.common.HttpStatus;
 import com.linecorp.armeria.server.annotation.ConsumesJson;
 import com.linecorp.armeria.server.annotation.Post;
 import com.linecorp.armeria.server.annotation.ProducesJson;
-import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
+import io.opentelemetry.proto.collector.metrics.firehose.v0_7.ExportMetricsServiceRequest;
 import java.io.ByteArrayInputStream;
 import java.util.Base64;
 import lombok.AllArgsConstructor;
@@ -33,7 +33,6 @@ import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRe
 @Slf4j
 @AllArgsConstructor
 public class FirehoseHTTPHandler {
-
     private final OpenTelemetryMetricRequestProcessor openTelemetryMetricRequestProcessor;
 
     @Post("/aws/firehose/metrics")
@@ -46,7 +45,8 @@ public class FirehoseHTTPHandler {
                     Base64.getDecoder().decode(record.getData()));
                 ExportMetricsServiceRequest request;
                 while ((request = ExportMetricsServiceRequest.parseDelimitedFrom(byteArrayInputStream)) != null) {
-                    openTelemetryMetricRequestProcessor.processMetricsRequest(request);
+                    openTelemetryMetricRequestProcessor.processMetricsRequest(
+                        OtelMetricsConvertor.convertExportMetricsRequest(request));
                 }
             }
         } catch (InvalidProtocolBufferException e) {
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/OtelMetricsConvertor.java b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/OtelMetricsConvertor.java
new file mode 100644
index 0000000000..0713680f2e
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/OtelMetricsConvertor.java
@@ -0,0 +1,344 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.receiver.aws.firehose;
+
+import io.opentelemetry.proto.collector.metrics.firehose.v0_7.ExportMetricsServiceRequest;
+import io.opentelemetry.proto.common.firehose.v0_7.ArrayValue;
+import io.opentelemetry.proto.common.firehose.v0_7.KeyValue;
+import io.opentelemetry.proto.common.firehose.v0_7.KeyValueList;
+import io.opentelemetry.proto.common.firehose.v0_7.StringKeyValue;
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleDataPoint;
+import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleGauge;
+import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleHistogram;
+import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleHistogramDataPoint;
+import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleSum;
+import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleSummary;
+import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleSummaryDataPoint;
+import io.opentelemetry.proto.metrics.firehose.v0_7.InstrumentationLibraryMetrics;
+import io.opentelemetry.proto.metrics.firehose.v0_7.IntDataPoint;
+import io.opentelemetry.proto.metrics.firehose.v0_7.IntGauge;
+import io.opentelemetry.proto.metrics.firehose.v0_7.IntHistogram;
+import io.opentelemetry.proto.metrics.firehose.v0_7.IntHistogramDataPoint;
+import io.opentelemetry.proto.metrics.firehose.v0_7.IntSum;
+import io.opentelemetry.proto.metrics.firehose.v0_7.Metric;
+import io.opentelemetry.proto.metrics.firehose.v0_7.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.DataPointFlags;
+import io.opentelemetry.proto.metrics.v1.Gauge;
+import io.opentelemetry.proto.metrics.v1.Histogram;
+import io.opentelemetry.proto.metrics.v1.HistogramDataPoint;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+import io.opentelemetry.proto.metrics.v1.Sum;
+import io.opentelemetry.proto.metrics.v1.Summary;
+import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
+import io.opentelemetry.proto.resource.v1.Resource;
+import java.util.Optional;
+
+public class OtelMetricsConvertor {
+
+    public static io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest convertExportMetricsRequest(
+        ExportMetricsServiceRequest sourceRequest) {
+        io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest.Builder targetRequestBuilder = io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest.newBuilder();
+        for (ResourceMetrics resourceMetrics : sourceRequest.getResourceMetricsList()) {
+            targetRequestBuilder.addResourceMetrics(convertResourceMetrics(resourceMetrics));
+        }
+        return targetRequestBuilder.build();
+    }
+
+    private static io.opentelemetry.proto.metrics.v1.ResourceMetrics convertResourceMetrics(final ResourceMetrics resourceMetrics) {
+        io.opentelemetry.proto.metrics.v1.ResourceMetrics.Builder targetResourceMetricsBuilder = io.opentelemetry.proto.metrics.v1.ResourceMetrics.newBuilder();
+        targetResourceMetricsBuilder.setResource(convertResource(resourceMetrics.getResource()));
+        for (InstrumentationLibraryMetrics instrumentationLibraryMetrics : resourceMetrics.getInstrumentationLibraryMetricsList()) {
+            targetResourceMetricsBuilder.addScopeMetrics(convertScopeMetrics(instrumentationLibraryMetrics));
+        }
+        return targetResourceMetricsBuilder.build();
+    }
+
+    private static ScopeMetrics convertScopeMetrics(final InstrumentationLibraryMetrics instrumentationLibraryMetrics) {
+        final ScopeMetrics.Builder builder = ScopeMetrics.newBuilder();
+        for (Metric metric : instrumentationLibraryMetrics.getMetricsList()) {
+            builder.addMetrics(convertMetrics(metric));
+        }
+        return builder.build();
+    }
+
+    private static io.opentelemetry.proto.metrics.v1.Metric convertMetrics(final Metric metric) {
+        final io.opentelemetry.proto.metrics.v1.Metric.Builder builder = io.opentelemetry.proto.metrics.v1.Metric.newBuilder();
+        builder.setDescription(metric.getDescription());
+        builder.setDescriptionBytes(metric.getDescriptionBytes());
+        builder.setName(metric.getName());
+        builder.setNameBytes(metric.getNameBytes());
+        builder.setUnit(metric.getUnit());
+        builder.setUnitBytes(metric.getUnitBytes());
+        Optional.of(metric.getDoubleGauge())
+                .map(OtelMetricsConvertor::convertDoubleGauge)
+                .ifPresent(builder::setGauge);
+        Optional.of(metric.getIntGauge()).map(OtelMetricsConvertor::convertIntGauge).ifPresent(builder::setGauge);
+        Optional.of(metric.getDoubleHistogram())
+                .map(OtelMetricsConvertor::convertDoubleHistogram)
+                .ifPresent(builder::setHistogram);
+        Optional.of(metric.getIntHistogram())
+                .map(OtelMetricsConvertor::convertIntHistogram)
+                .ifPresent(builder::setHistogram);
+        Optional.of(metric.getDoubleSum()).map(OtelMetricsConvertor::convertDoubleSum).ifPresent(builder::setSum);
+        Optional.of(metric.getIntSum()).map(OtelMetricsConvertor::convertIntSum).ifPresent(builder::setSum);
+        Optional.of(metric.getDoubleSummary())
+                .map(OtelMetricsConvertor::convertDoubleSummary)
+                .ifPresent(builder::setSummary);
+        return builder.build();
+    }
+
+    private static Summary convertDoubleSummary(final DoubleSummary doubleSummary) {
+        final Summary.Builder builder = Summary.newBuilder();
+        doubleSummary.getDataPointsList()
+                     .stream()
+                     .map(OtelMetricsConvertor::convertDoubleSummaryDatapoint)
+                     .forEach(builder::addDataPoints);
+        return builder.build();
+    }
+
+    private static SummaryDataPoint convertDoubleSummaryDatapoint(final DoubleSummaryDataPoint doubleSummaryDataPoint) {
+        final SummaryDataPoint.Builder builder = SummaryDataPoint.newBuilder();
+        doubleSummaryDataPoint.getLabelsList()
+                              .stream()
+                              .map(OtelMetricsConvertor::convertStringKV)
+                              .forEach(builder::addAttributes);
+        builder.setCount(doubleSummaryDataPoint.getCount());
+        builder.setSum(doubleSummaryDataPoint.getSum());
+        builder.setFlags(DataPointFlags.FLAG_NONE_VALUE);
+        doubleSummaryDataPoint.getQuantileValuesList()
+                              .stream()
+                              .map(OtelMetricsConvertor::convertValueAtQuantile)
+                              .forEach(builder::addQuantileValues);
+        builder.setStartTimeUnixNano(doubleSummaryDataPoint.getStartTimeUnixNano());
+        builder.setTimeUnixNano(doubleSummaryDataPoint.getTimeUnixNano());
+        return builder.build();
+    }
+
+    private static SummaryDataPoint.ValueAtQuantile convertValueAtQuantile(final DoubleSummaryDataPoint.ValueAtQuantile valueAtQuantile) {
+        final SummaryDataPoint.ValueAtQuantile.Builder builder = SummaryDataPoint.ValueAtQuantile.newBuilder();
+        builder.setValue(valueAtQuantile.getValue());
+        builder.setQuantile(valueAtQuantile.getQuantile());
+        return builder.build();
+    }
+
+    private static Sum convertIntSum(final IntSum intSum) {
+        final Sum.Builder builder = Sum.newBuilder();
+        intSum.getDataPointsList()
+              .stream()
+              .map(OtelMetricsConvertor::convertIntDataPoint)
+              .forEach(builder::addDataPoints);
+        return builder.build();
+    }
+
+    private static Sum convertDoubleSum(final DoubleSum doubleSum) {
+        final Sum.Builder builder = Sum.newBuilder();
+        doubleSum.getDataPointsList()
+                 .stream()
+                 .map(OtelMetricsConvertor::convertDoubleDataPoint)
+                 .forEach(builder::addDataPoints);
+        return builder.build();
+    }
+
+    /**
+     * Convert DoubleDataPoint in OTEL 0.7 to NumberDataPoint
+     * Notice this method ignore Exemplar field in HistogramDataPoint
+     */
+    private static NumberDataPoint convertDoubleDataPoint(final DoubleDataPoint doubleDataPoint) {
+        final NumberDataPoint.Builder builder = NumberDataPoint.newBuilder();
+        doubleDataPoint.getLabelsList()
+                       .stream()
+                       .map(OtelMetricsConvertor::convertStringKV)
+                       .forEach(builder::addAttributes);
+        builder.setTimeUnixNano(doubleDataPoint.getTimeUnixNano());
+        builder.setStartTimeUnixNano(doubleDataPoint.getStartTimeUnixNano());
+        builder.setAsDouble(doubleDataPoint.getValue());
+        return builder.build();
+    }
+
+    private static NumberDataPoint convertIntDataPoint(final IntDataPoint intDataPoint) {
+        final NumberDataPoint.Builder builder = NumberDataPoint.newBuilder();
+        intDataPoint.getLabelsList()
+                    .stream()
+                    .map(OtelMetricsConvertor::convertStringKV)
+                    .forEach(builder::addAttributes);
+        builder.setTimeUnixNano(intDataPoint.getTimeUnixNano());
+        builder.setStartTimeUnixNano(intDataPoint.getStartTimeUnixNano());
+        builder.setAsInt(intDataPoint.getValue());
+        return builder.build();
+    }
+
+    private static io.opentelemetry.proto.common.v1.KeyValue convertStringKV(final StringKeyValue stringKeyValue) {
+        return io.opentelemetry.proto.common.v1.KeyValue.newBuilder()
+                                                        .setKey(stringKeyValue.getKey())
+                                                        .setValue(
+                                                            AnyValue.newBuilder()
+                                                                    .setStringValue(stringKeyValue.getValue()))
+                                                        .build();
+    }
+
+    private static Histogram convertIntHistogram(final IntHistogram intHistogram) {
+        final Histogram.Builder builder = Histogram.newBuilder();
+        builder.setAggregationTemporality(convertAggregationTemporality(intHistogram.getAggregationTemporality()));
+        builder.setAggregationTemporalityValue(intHistogram.getAggregationTemporalityValue());
+        intHistogram.getDataPointsList()
+                    .stream()
+                    .map(OtelMetricsConvertor::convertIntHistogramDataPoint)
+                    .forEach(builder::addDataPoints);
+        return builder.build();
+    }
+
+    /**
+     * Convert IntHistogramDataPoint in OTEL 0.7 to HistogramDataPoint
+     * Notice this method ignore min, max, Exemplar fields in HistogramDataPoint
+     */
+    private static HistogramDataPoint convertIntHistogramDataPoint(final IntHistogramDataPoint intHistogramDataPoint) {
+        final HistogramDataPoint.Builder builder = HistogramDataPoint.newBuilder();
+        intHistogramDataPoint.getLabelsList()
+                             .stream()
+                             .map(OtelMetricsConvertor::convertStringKV)
+                             .forEach(builder::addAttributes);
+
+        builder.setCount(intHistogramDataPoint.getCount());
+        builder.setSum(intHistogramDataPoint.getSum());
+        builder.setStartTimeUnixNano(intHistogramDataPoint.getStartTimeUnixNano());
+        builder.setTimeUnixNano(intHistogramDataPoint.getTimeUnixNano());
+        builder.addBucketCounts(intHistogramDataPoint.getBucketCountsCount());
+        builder.setFlags(DataPointFlags.FLAG_NONE_VALUE);
+        builder.addAllExplicitBounds(intHistogramDataPoint.getExplicitBoundsList());
+        return builder.build();
+    }
+
+    /**
+     * Convert IntHistogramDataPoint in OTEL 0.7 to HistogramDataPoint
+     * Notice this method ignore min, max, Exemplar fields in HistogramDataPoint
+     */
+    private static HistogramDataPoint convertDoubleHistogramDataPoint(final DoubleHistogramDataPoint intHistogramDataPoint) {
+        final HistogramDataPoint.Builder builder = HistogramDataPoint.newBuilder();
+        intHistogramDataPoint.getLabelsList()
+                             .stream()
+                             .map(OtelMetricsConvertor::convertStringKV)
+                             .forEach(builder::addAttributes);
+        builder.setCount(intHistogramDataPoint.getCount());
+        builder.setSum(intHistogramDataPoint.getSum());
+        builder.setStartTimeUnixNano(intHistogramDataPoint.getStartTimeUnixNano());
+        builder.setTimeUnixNano(intHistogramDataPoint.getTimeUnixNano());
+        builder.addBucketCounts(intHistogramDataPoint.getBucketCountsCount());
+        builder.setFlags(DataPointFlags.FLAG_NONE_VALUE);
+        builder.addAllExplicitBounds(intHistogramDataPoint.getExplicitBoundsList());
+        return builder.build();
+    }
+
+    private static AggregationTemporality convertAggregationTemporality(final io.opentelemetry.proto.metrics.firehose.v0_7.AggregationTemporality aggregationTemporality) {
+
+        if (aggregationTemporality == io.opentelemetry.proto.metrics.firehose.v0_7.AggregationTemporality.AGGREGATION_TEMPORALITY_UNSPECIFIED) {
+            return AggregationTemporality.AGGREGATION_TEMPORALITY_UNSPECIFIED;
+        }
+        if (aggregationTemporality == io.opentelemetry.proto.metrics.firehose.v0_7.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE) {
+            return AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE;
+        }
+
+        if (aggregationTemporality == io.opentelemetry.proto.metrics.firehose.v0_7.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA) {
+            return AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA;
+        }
+        throw new UnsupportedOperationException("Can't convert " + aggregationTemporality);
+    }
+
+    private static Histogram convertDoubleHistogram(final DoubleHistogram doubleHistogram) {
+        final Histogram.Builder builder = Histogram.newBuilder();
+        builder.setAggregationTemporality(convertAggregationTemporality(doubleHistogram.getAggregationTemporality()));
+        builder.setAggregationTemporalityValue(doubleHistogram.getAggregationTemporalityValue());
+        doubleHistogram.getDataPointsList()
+                       .stream()
+                       .map(OtelMetricsConvertor::convertDoubleHistogramDataPoint)
+                       .forEach(builder::addDataPoints);
+        return builder.build();
+
+    }
+
+    private static Gauge convertIntGauge(final IntGauge intGauge) {
+        final Gauge.Builder builder = Gauge.newBuilder();
+        intGauge.getDataPointsList()
+                .stream()
+                .map(OtelMetricsConvertor::convertIntDataPoint)
+                .forEach(builder::addDataPoints);
+        return builder.build();
+    }
+
+    private static Gauge convertDoubleGauge(final DoubleGauge doubleGauge) {
+        final Gauge.Builder builder = Gauge.newBuilder();
+        doubleGauge.getDataPointsList()
+                   .stream()
+                   .map(OtelMetricsConvertor::convertDoubleDataPoint)
+                   .forEach(builder::addDataPoints);
+        return builder.build();
+    }
+
+    private static Resource convertResource(final io.opentelemetry.proto.resource.firehose.v0_7.Resource resource) {
+        final Resource.Builder builder = Resource.newBuilder();
+        for (KeyValue keyValue : resource.getAttributesList()) {
+            builder.addAttributes(convertKeyValue(keyValue));
+        }
+        return builder.build();
+    }
+
+    private static AnyValue convertAnyValue(final io.opentelemetry.proto.common.firehose.v0_7.AnyValue value) {
+        final AnyValue.Builder builder = AnyValue.newBuilder();
+        if (value.hasBoolValue()) {
+            builder.setBoolValue(value.getBoolValue());
+        }
+        if (value.hasDoubleValue()) {
+            builder.setDoubleValue(value.getDoubleValue());
+        }
+        if (value.hasIntValue()) {
+            builder.setIntValue(value.getIntValue());
+        }
+        if (value.hasStringValue()) {
+            builder.setStringValue(value.getStringValue());
+        }
+        if (value.hasArrayValue()) {
+            builder.setArrayValue(convertValuList(value.getArrayValue()));
+        }
+        if (value.hasKvlistValue()) {
+            builder.setKvlistValue(convertKvlistValue(value.getKvlistValue()));
+        }
+        return builder.build();
+    }
+
+    private static io.opentelemetry.proto.common.v1.KeyValueList convertKvlistValue(final KeyValueList keyValueList) {
+        final io.opentelemetry.proto.common.v1.KeyValueList.Builder builder = io.opentelemetry.proto.common.v1.KeyValueList.newBuilder();
+        keyValueList.getValuesList().stream().map(OtelMetricsConvertor::convertKeyValue).forEach(builder::addValues);
+        return builder.build();
+    }
+
+    private static io.opentelemetry.proto.common.v1.KeyValue convertKeyValue(final KeyValue keyValue) {
+        return io.opentelemetry.proto.common.v1.KeyValue.newBuilder()
+                                                        .setKey(keyValue.getKey())
+                                                        .setValue(convertAnyValue(keyValue.getValue()))
+                                                        .build();
+    }
+
+    private static io.opentelemetry.proto.common.v1.ArrayValue convertValuList(final ArrayValue arrayValue) {
+        final io.opentelemetry.proto.common.v1.ArrayValue.Builder builder = io.opentelemetry.proto.common.v1.ArrayValue.newBuilder();
+        arrayValue.getValuesList().stream().map(OtelMetricsConvertor::convertAnyValue).forEach(builder::addValues);
+        return builder.build();
+    }
+
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/README.md b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/README.md
new file mode 100644
index 0000000000..4a73a31ed8
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/README.md
@@ -0,0 +1,9 @@
+# OpenTelemetry Collector Proto
+
+This package describes the OpenTelemetry collector protocol.
+
+## Packages
+
+1. `common` package contains the common messages shared between different services.
+2. `trace` package contains the Trace Service protos.
+3. `metrics` package contains the Metrics Service protos.
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto
new file mode 100644
index 0000000000..6989cf01ff
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto
@@ -0,0 +1,45 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.collector.metrics.v1;
+
+import "opentelemetry/proto/metrics/v1/metrics.proto";
+
+option java_multiple_files = true;
+option java_package = "io.opentelemetry.proto.collector.metrics.firehose.v0_7";
+option java_outer_classname = "MetricsServiceProto";
+option go_package = "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1";
+
+// Service that can be used to push metrics between one Application
+// instrumented with OpenTelemetry and a collector, or between a collector and a
+// central collector.
+service MetricsService {
+  // For performance reasons, it is recommended to keep this RPC
+  // alive for the entire life of the application.
+  rpc Export(ExportMetricsServiceRequest) returns (ExportMetricsServiceResponse) {}
+}
+
+message ExportMetricsServiceRequest {
+  // An array of ResourceMetrics.
+  // For data coming from a single resource this array will typically contain one
+  // element. Intermediary nodes (such as OpenTelemetry Collector) that receive
+  // data from multiple origins typically batch the data before forwarding further and
+  // in that case this array will contain multiple elements.
+  repeated opentelemetry.proto.metrics.v1.ResourceMetrics resource_metrics = 1;
+}
+
+message ExportMetricsServiceResponse {
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/metrics/v1/metrics_service_http.yaml b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/metrics/v1/metrics_service_http.yaml
new file mode 100644
index 0000000000..a545650260
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/collector/metrics/v1/metrics_service_http.yaml
@@ -0,0 +1,9 @@
+# This is an API configuration to generate an HTTP/JSON -> gRPC gateway for the
+# OpenTelemetry service using github.com/grpc-ecosystem/grpc-gateway.
+type: google.api.Service
+config_version: 3
+http:
+ rules:
+ - selector: opentelemetry.proto.collector.metrics.v1.MetricsService.Export
+   post: /v1/metrics
+   body: "*"
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/common/v1/common.proto b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/common/v1/common.proto
new file mode 100644
index 0000000000..340c63105f
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/common/v1/common.proto
@@ -0,0 +1,78 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.common.v1;
+
+option java_multiple_files = true;
+option java_package = "io.opentelemetry.proto.common.firehose.v0_7";
+option java_outer_classname = "CommonProto";
+option go_package = "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1";
+
+// AnyValue is used to represent any type of attribute value. AnyValue may contain a
+// primitive value such as a string or integer or it may contain an arbitrary nested
+// object containing arrays, key-value lists and primitives.
+message AnyValue {
+  // The value is one of the listed fields. It is valid for all values to be unspecified
+  // in which case this AnyValue is considered to be "null".
+  oneof value {
+    string string_value = 1;
+    bool bool_value = 2;
+    int64 int_value = 3;
+    double double_value = 4;
+    ArrayValue array_value = 5;
+    KeyValueList kvlist_value = 6;
+  }
+}
+
+// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message
+// since oneof in AnyValue does not allow repeated fields.
+message ArrayValue {
+  // Array of values. The array may be empty (contain 0 elements).
+  repeated AnyValue values = 1;
+}
+
+// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message
+// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need
+// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to
+// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches
+// are semantically equivalent.
+message KeyValueList {
+  // A collection of key/value pairs of key-value pairs. The list may be empty (may
+  // contain 0 elements).
+  repeated KeyValue values = 1;
+}
+
+// KeyValue is a key-value pair that is used to store Span attributes, Link
+// attributes, etc.
+message KeyValue {
+  string key = 1;
+  AnyValue value = 2;
+}
+
+// StringKeyValue is a pair of key/value strings. This is the simpler (and faster) version
+// of KeyValue that only supports string values.
+message StringKeyValue {
+  string key = 1;
+  string value = 2;
+}
+
+// InstrumentationLibrary is a message representing the instrumentation library information
+// such as the fully qualified name and version. 
+message InstrumentationLibrary {
+  // An empty instrumentation library name means the name is unknown. 
+  string name = 1;
+  string version = 2;
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/metrics/experimental/configservice.proto b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/metrics/experimental/configservice.proto
new file mode 100644
index 0000000000..a6415b5066
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/metrics/experimental/configservice.proto
@@ -0,0 +1,102 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.metrics.experimental;
+
+import "opentelemetry/proto/resource/v1/resource.proto";
+
+option java_multiple_files = true;
+option java_package = "io.opentelemetry.proto.metrics.firehose.v0_7.experimental";
+option java_outer_classname = "MetricConfigServiceProto";
+option go_package = "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/experimental";
+
+// MetricConfig is a service that enables updating metric schedules, trace
+// parameters, and other configurations on the SDK without having to restart the
+// instrumented application. The collector can also serve as the configuration
+// service, acting as a bridge between third-party configuration services and
+// the SDK, piping updated configs from a third-party source to an instrumented
+// application.
+service MetricConfig {
+  rpc GetMetricConfig (MetricConfigRequest) returns (MetricConfigResponse);
+}
+
+message MetricConfigRequest{
+
+  // Required. The resource for which configuration should be returned.
+  opentelemetry.proto.resource.v1.Resource resource = 1;
+
+  // Optional. The value of MetricConfigResponse.fingerprint for the last
+  // configuration that the caller received and successfully applied.
+  bytes last_known_fingerprint = 2;
+}
+
+message MetricConfigResponse {
+
+  // Optional. The fingerprint associated with this MetricConfigResponse. Each
+  // change in configs yields a different fingerprint. The resource SHOULD copy
+  // this value to MetricConfigRequest.last_known_fingerprint for the next
+  // configuration request. If there are no changes between fingerprint and
+  // MetricConfigRequest.last_known_fingerprint, then all other fields besides
+  // fingerprint in the response are optional, or the same as the last update if
+  // present.
+  //
+  // The exact mechanics of generating the fingerprint is up to the
+  // implementation. However, a fingerprint must be deterministically determined
+  // by the configurations -- the same configuration will generate the same
+  // fingerprint on any instance of an implementation. Hence using a timestamp is
+  // unacceptable, but a deterministic hash is fine.
+  bytes fingerprint = 1;
+
+  // A Schedule is used to apply a particular scheduling configuration to
+  // a metric. If a metric name matches a schedule's patterns, then the metric
+  // adopts the configuration specified by the schedule.
+  message Schedule {
+
+    // A light-weight pattern that can match 1 or more
+    // metrics, for which this schedule will apply. The string is used to
+    // match against metric names. It should not exceed 100k characters.
+    message Pattern {
+      oneof match {
+        string equals = 1;       // matches the metric name exactly
+        string starts_with = 2;  // prefix-matches the metric name
+      }
+    }
+
+    // Metrics with names that match a rule in the inclusion_patterns are
+    // targeted by this schedule. Metrics that match the exclusion_patterns
+    // are not targeted for this schedule, even if they match an inclusion
+    // pattern.
+    repeated Pattern exclusion_patterns = 1;
+    repeated Pattern inclusion_patterns = 2;
+
+    // Describes the collection period for each metric in seconds.
+    // A period of 0 means to not export.
+    int32 period_sec = 3;
+  }
+
+  // A single metric may match multiple schedules. In such cases, the schedule
+  // that specifies the smallest period is applied.
+  //
+  // Note, for optimization purposes, it is recommended to use as few schedules
+  // as possible to capture all required metric updates. Where you can be
+  // conservative, do take full advantage of the inclusion/exclusion patterns to
+  // capture as much of your targeted metrics.
+  repeated Schedule schedules = 2;
+
+  // Optional. The client is suggested to wait this long (in seconds) before
+  // pinging the configuration service again.
+  int32 suggested_wait_time_sec = 3;
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/metrics/v1/metrics.proto b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/metrics/v1/metrics.proto
new file mode 100644
index 0000000000..f0a76125e9
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/metrics/v1/metrics.proto
@@ -0,0 +1,636 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.metrics.v1;
+
+import "opentelemetry/proto/common/v1/common.proto";
+import "opentelemetry/proto/resource/v1/resource.proto";
+
+option java_multiple_files = true;
+option java_package = "io.opentelemetry.proto.metrics.firehose.v0_7";
+option java_outer_classname = "MetricsProto";
+option go_package = "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1";
+
+// A collection of InstrumentationLibraryMetrics from a Resource.
+message ResourceMetrics {
+  // The resource for the metrics in this message.
+  // If this field is not set then no resource info is known.
+  opentelemetry.proto.resource.v1.Resource resource = 1;
+
+  // A list of metrics that originate from a resource.
+  repeated InstrumentationLibraryMetrics instrumentation_library_metrics = 2;
+}
+
+// A collection of Metrics produced by an InstrumentationLibrary.
+message InstrumentationLibraryMetrics {
+  // The instrumentation library information for the metrics in this message.
+  // Semantically when InstrumentationLibrary isn't set, it is equivalent with
+  // an empty instrumentation library name (unknown).
+  opentelemetry.proto.common.v1.InstrumentationLibrary instrumentation_library = 1;
+
+  // A list of metrics that originate from an instrumentation library.
+  repeated Metric metrics = 2;
+}
+
+// Defines a Metric which has one or more timeseries.
+//
+// The data model and relation between entities is shown in the
+// diagram below. Here, "DataPoint" is the term used to refer to any
+// one of the specific data point value types, and "points" is the term used
+// to refer to any one of the lists of points contained in the Metric.
+//
+// - Metric is composed of a metadata and data.
+// - Metadata part contains a name, description, unit.
+// - Data is one of the possible types (Gauge, Sum, Histogram, etc.).
+// - DataPoint contains timestamps, labels, and one of the possible value type
+//   fields.
+//
+//     Metric
+//  +------------+
+//  |name        |
+//  |description |
+//  |unit        |     +------------------------------------+
+//  |data        |---> |Gauge, Sum, Histogram, Summary, ... |
+//  +------------+     +------------------------------------+
+//
+//    Data [One of Gauge, Sum, Histogram, Summary, ...]
+//  +-----------+
+//  |...        |  // Metadata about the Data.
+//  |points     |--+
+//  +-----------+  |
+//                 |      +---------------------------+
+//                 |      |DataPoint 1                |
+//                 v      |+------+------+   +------+ |
+//              +-----+   ||label |label |...|label | |
+//              |  1  |-->||value1|value2|...|valueN| |
+//              +-----+   |+------+------+   +------+ |
+//              |  .  |   |+-----+                    |
+//              |  .  |   ||value|                    |
+//              |  .  |   |+-----+                    |
+//              |  .  |   +---------------------------+
+//              |  .  |                   .
+//              |  .  |                   .
+//              |  .  |                   .
+//              |  .  |   +---------------------------+
+//              |  .  |   |DataPoint M                |
+//              +-----+   |+------+------+   +------+ |
+//              |  M  |-->||label |label |...|label | |
+//              +-----+   ||value1|value2|...|valueN| |
+//                        |+------+------+   +------+ |
+//                        |+-----+                    |
+//                        ||value|                    |
+//                        |+-----+                    |
+//                        +---------------------------+
+//
+// All DataPoint types have three common fields:
+// - Labels zero or more key-value pairs associated with the data point.
+// - StartTimeUnixNano MUST be set to the start of the interval when the data's
+//   type includes an AggregationTemporality. This field is not set otherwise.
+// - TimeUnixNano MUST be set to:
+//   - the moment when an aggregation is reported (independent of the
+//     aggregation temporality).
+//   - the instantaneous time of the event.
+message Metric {
+  // name of the metric, including its DNS name prefix. It must be unique.
+  string name = 1;
+
+  // description of the metric, which can be used in documentation.
+  string description = 2;
+
+  // unit in which the metric value is reported. Follows the format
+  // described by http://unitsofmeasure.org/ucum.html.
+  string unit = 3;
+
+  // TODO: Decide if support for RawMeasurements (measurements recorded using
+  // the synchronous instruments) is necessary. It can be used to delegate the
+  // aggregation from the application to the agent/collector. See
+  // https://github.com/open-telemetry/opentelemetry-specification/issues/617
+
+  // Data determines the aggregation type (if any) of the metric, what is the
+  // reported value type for the data points, as well as the relatationship to
+  // the time interval over which they are reported.
+  //
+  // TODO: Update table after the decision on:
+  // https://github.com/open-telemetry/opentelemetry-specification/issues/731.
+  // By default, metrics recording using the OpenTelemetry API are exported as
+  // (the table does not include MeasurementValueType to avoid extra rows):
+  //
+  //   Instrument         Type
+  //   ----------------------------------------------
+  //   Counter            Sum(aggregation_temporality=delta;is_monotonic=true)
+  //   UpDownCounter      Sum(aggregation_temporality=delta;is_monotonic=false)
+  //   ValueRecorder      TBD
+  //   SumObserver        Sum(aggregation_temporality=cumulative;is_monotonic=true)
+  //   UpDownSumObserver  Sum(aggregation_temporality=cumulative;is_monotonic=false)
+  //   ValueObserver      Gauge()
+  oneof data {
+    IntGauge int_gauge = 4;
+    DoubleGauge double_gauge = 5;
+    IntSum int_sum = 6;
+    DoubleSum double_sum = 7;
+    IntHistogram int_histogram = 8;
+    DoubleHistogram double_histogram = 9;
+    DoubleSummary double_summary = 11;
+  }
+}
+
+// Gauge represents the type of a int scalar metric that always exports the
+// "current value" for every data point. It should be used for an "unknown"
+// aggregation.
+// 
+// A Gauge does not support different aggregation temporalities. Given the
+// aggregation is unknown, points cannot be combined using the same
+// aggregation, regardless of aggregation temporalities. Therefore,
+// AggregationTemporality is not included. Consequently, this also means
+// "StartTimeUnixNano" is ignored for all data points.
+message IntGauge {
+  repeated IntDataPoint data_points = 1;
+}
+
+// Gauge represents the type of a double scalar metric that always exports the
+// "current value" for every data point. It should be used for an "unknown"
+// aggregation.
+// 
+// A Gauge does not support different aggregation temporalities. Given the
+// aggregation is unknown, points cannot be combined using the same
+// aggregation, regardless of aggregation temporalities. Therefore,
+// AggregationTemporality is not included. Consequently, this also means
+// "StartTimeUnixNano" is ignored for all data points.
+message DoubleGauge {
+  repeated DoubleDataPoint data_points = 1;
+}
+
+// Sum represents the type of a numeric int scalar metric that is calculated as
+// a sum of all reported measurements over a time interval.
+message IntSum {
+  repeated IntDataPoint data_points = 1;
+
+  // aggregation_temporality describes if the aggregator reports delta changes
+  // since last report time, or cumulative changes since a fixed start time.
+  AggregationTemporality aggregation_temporality = 2;
+
+  // If "true" means that the sum is monotonic.
+  bool is_monotonic = 3;
+}
+
+// Sum represents the type of a numeric double scalar metric that is calculated
+// as a sum of all reported measurements over a time interval.
+message DoubleSum {
+  repeated DoubleDataPoint data_points = 1;
+  
+  // aggregation_temporality describes if the aggregator reports delta changes
+  // since last report time, or cumulative changes since a fixed start time.
+  AggregationTemporality aggregation_temporality = 2;
+
+  // If "true" means that the sum is monotonic.
+  bool is_monotonic = 3;
+}
+
+// Represents the type of a metric that is calculated by aggregating as a
+// Histogram of all reported int measurements over a time interval.
+message IntHistogram {
+  repeated IntHistogramDataPoint data_points = 1;
+
+  // aggregation_temporality describes if the aggregator reports delta changes
+  // since last report time, or cumulative changes since a fixed start time.
+  AggregationTemporality aggregation_temporality = 2;
+}
+
+// Represents the type of a metric that is calculated by aggregating as a
+// Histogram of all reported double measurements over a time interval.
+message DoubleHistogram {
+  repeated DoubleHistogramDataPoint data_points = 1;
+
+  // aggregation_temporality describes if the aggregator reports delta changes
+  // since last report time, or cumulative changes since a fixed start time.
+  AggregationTemporality aggregation_temporality = 2;
+}
+
+// DoubleSummary metric data are used to convey quantile summaries,
+// a Prometheus (see: https://prometheus.io/docs/concepts/metric_types/#summary)
+// and OpenMetrics (see: https://github.com/OpenObservability/OpenMetrics/blob/4dbf6075567ab43296eed941037c12951faafb92/protos/prometheus.proto#L45)
+// data type. These data points cannot always be merged in a meaningful way.
+// While they can be useful in some applications, histogram data points are
+// recommended for new applications.
+message DoubleSummary {
+  repeated DoubleSummaryDataPoint data_points = 1;
+}
+
+// AggregationTemporality defines how a metric aggregator reports aggregated
+// values. It describes how those values relate to the time interval over
+// which they are aggregated.
+enum AggregationTemporality {
+  // UNSPECIFIED is the default AggregationTemporality, it MUST not be used.
+  AGGREGATION_TEMPORALITY_UNSPECIFIED = 0;
+
+  // DELTA is an AggregationTemporality for a metric aggregator which reports
+  // changes since last report time. Successive metrics contain aggregation of
+  // values from continuous and non-overlapping intervals.
+  //
+  // The values for a DELTA metric are based only on the time interval
+  // associated with one measurement cycle. There is no dependency on
+  // previous measurements like is the case for CUMULATIVE metrics.
+  //
+  // For example, consider a system measuring the number of requests that
+  // it receives and reports the sum of these requests every second as a
+  // DELTA metric:
+  //
+  //   1. The system starts receiving at time=t_0.
+  //   2. A request is received, the system measures 1 request.
+  //   3. A request is received, the system measures 1 request.
+  //   4. A request is received, the system measures 1 request.
+  //   5. The 1 second collection cycle ends. A metric is exported for the
+  //      number of requests received over the interval of time t_0 to
+  //      t_0+1 with a value of 3.
+  //   6. A request is received, the system measures 1 request.
+  //   7. A request is received, the system measures 1 request.
+  //   8. The 1 second collection cycle ends. A metric is exported for the
+  //      number of requests received over the interval of time t_0+1 to
+  //      t_0+2 with a value of 2.
+  AGGREGATION_TEMPORALITY_DELTA = 1;
+
+  // CUMULATIVE is an AggregationTemporality for a metric aggregator which
+  // reports changes since a fixed start time. This means that current values
+  // of a CUMULATIVE metric depend on all previous measurements since the
+  // start time. Because of this, the sender is required to retain this state
+  // in some form. If this state is lost or invalidated, the CUMULATIVE metric
+  // values MUST be reset and a new fixed start time following the last
+  // reported measurement time sent MUST be used.
+  //
+  // For example, consider a system measuring the number of requests that
+  // it receives and reports the sum of these requests every second as a
+  // CUMULATIVE metric:
+  //
+  //   1. The system starts receiving at time=t_0.
+  //   2. A request is received, the system measures 1 request.
+  //   3. A request is received, the system measures 1 request.
+  //   4. A request is received, the system measures 1 request.
+  //   5. The 1 second collection cycle ends. A metric is exported for the
+  //      number of requests received over the interval of time t_0 to
+  //      t_0+1 with a value of 3.
+  //   6. A request is received, the system measures 1 request.
+  //   7. A request is received, the system measures 1 request.
+  //   8. The 1 second collection cycle ends. A metric is exported for the
+  //      number of requests received over the interval of time t_0 to
+  //      t_0+2 with a value of 5.
+  //   9. The system experiences a fault and loses state.
+  //   10. The system recovers and resumes receiving at time=t_1.
+  //   11. A request is received, the system measures 1 request.
+  //   12. The 1 second collection cycle ends. A metric is exported for the
+  //      number of requests received over the interval of time t_1 to
+  //      t_0+1 with a value of 1.
+  //
+  // Note: Even though, when reporting changes since last report time, using 
+  // CUMULATIVE is valid, it is not recommended. This may cause problems for
+  // systems that do not use start_time to determine when the aggregation
+  // value was reset (e.g. Prometheus).
+  AGGREGATION_TEMPORALITY_CUMULATIVE = 2;
+}
+
+// IntDataPoint is a single data point in a timeseries that describes the
+// time-varying values of a int64 metric.
+message IntDataPoint {
+  // The set of labels that uniquely identify this timeseries.
+  repeated opentelemetry.proto.common.v1.StringKeyValue labels = 1;
+
+  // start_time_unix_nano is the last time when the aggregation value was reset
+  // to "zero". For some metric types this is ignored, see data types for more
+  // details.
+  //
+  // The aggregation value is over the time interval (start_time_unix_nano,
+  // time_unix_nano].
+  // 
+  // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+  // 1970.
+  //
+  // Value of 0 indicates that the timestamp is unspecified. In that case the
+  // timestamp may be decided by the backend.
+  fixed64 start_time_unix_nano = 2;
+
+  // time_unix_nano is the moment when this aggregation value was reported.
+  // 
+  // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+  // 1970.
+  fixed64 time_unix_nano = 3;
+
+  // value itself.
+  sfixed64 value = 4;
+
+  // (Optional) List of exemplars collected from
+  // measurements that were used to form the data point
+  repeated IntExemplar exemplars = 5;
+}
+
+// DoubleDataPoint is a single data point in a timeseries that describes the
+// time-varying value of a double metric.
+message DoubleDataPoint {
+  // The set of labels that uniquely identify this timeseries.
+  repeated opentelemetry.proto.common.v1.StringKeyValue labels = 1;
+
+  // start_time_unix_nano is the last time when the aggregation value was reset
+  // to "zero". For some metric types this is ignored, see data types for more
+  // details.
+  //
+  // The aggregation value is over the time interval (start_time_unix_nano,
+  // time_unix_nano].
+  // 
+  // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+  // 1970.
+  //
+  // Value of 0 indicates that the timestamp is unspecified. In that case the
+  // timestamp may be decided by the backend.
+  fixed64 start_time_unix_nano = 2;
+
+  // time_unix_nano is the moment when this aggregation value was reported.
+  // 
+  // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+  // 1970.
+  fixed64 time_unix_nano = 3;
+
+  // value itself.
+  double value = 4;
+
+  // (Optional) List of exemplars collected from
+  // measurements that were used to form the data point
+  repeated DoubleExemplar exemplars = 5;
+}
+
+// IntHistogramDataPoint is a single data point in a timeseries that describes
+// the time-varying values of a Histogram of int values. A Histogram contains
+// summary statistics for a population of values, it may optionally contain
+// the distribution of those values across a set of buckets.
+message IntHistogramDataPoint {
+  // The set of labels that uniquely identify this timeseries.
+  repeated opentelemetry.proto.common.v1.StringKeyValue labels = 1;
+
+  // start_time_unix_nano is the last time when the aggregation value was reset
+  // to "zero". For some metric types this is ignored, see data types for more
+  // details.
+  //
+  // The aggregation value is over the time interval (start_time_unix_nano,
+  // time_unix_nano].
+  // 
+  // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+  // 1970.
+  //
+  // Value of 0 indicates that the timestamp is unspecified. In that case the
+  // timestamp may be decided by the backend.
+  fixed64 start_time_unix_nano = 2;
+
+  // time_unix_nano is the moment when this aggregation value was reported.
+  // 
+  // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+  // 1970.
+  fixed64 time_unix_nano = 3;
+
+  // count is the number of values in the population. Must be non-negative. This
+  // value must be equal to the sum of the "count" fields in buckets if a
+  // histogram is provided.
+  fixed64 count = 4;
+
+  // sum of the values in the population. If count is zero then this field
+  // must be zero. This value must be equal to the sum of the "sum" fields in
+  // buckets if a histogram is provided.
+  sfixed64 sum = 5;
+
+  // bucket_counts is an optional field contains the count values of histogram
+  // for each bucket.
+  //
+  // The sum of the bucket_counts must equal the value in the count field.
+  //
+  // The number of elements in bucket_counts array must be by one greater than
+  // the number of elements in explicit_bounds array.
+  repeated fixed64 bucket_counts = 6;
+
+  // A histogram may optionally contain the distribution of the values in the population.
+  // In that case one of the option fields below and "buckets" field both must be defined.
+  // Otherwise all option fields and "buckets" field must be omitted in which case the
+  // distribution of values in the histogram is unknown and only the total count and sum are known.
+
+  // explicit_bounds is the only supported bucket option currently.
+  // TODO: Add more bucket options.
+
+  // explicit_bounds specifies buckets with explicitly defined bounds for values.
+  // The bucket boundaries are described by "bounds" field.
+  //
+  // This defines size(bounds) + 1 (= N) buckets. The boundaries for bucket
+  // at index i are:
+  //
+  // (-infinity, bounds[i]) for i == 0
+  // [bounds[i-1], bounds[i]) for 0 < i < N-1
+  // [bounds[i], +infinity) for i == N-1
+  // The values in bounds array must be strictly increasing.
+  //
+  // Note: only [a, b) intervals are currently supported for each bucket except the first one.
+  // If we decide to also support (a, b] intervals we should add support for these by defining
+  // a boolean value which decides what type of intervals to use.
+  repeated double explicit_bounds = 7;
+
+  // (Optional) List of exemplars collected from
+  // measurements that were used to form the data point
+  repeated IntExemplar exemplars = 8;
+}
+
+// HistogramDataPoint is a single data point in a timeseries that describes the
+// time-varying values of a Histogram of double values. A Histogram contains
+// summary statistics for a population of values, it may optionally contain the
+// distribution of those values across a set of buckets.
+message DoubleHistogramDataPoint {
+  // The set of labels that uniquely identify this timeseries.
+  repeated opentelemetry.proto.common.v1.StringKeyValue labels = 1;
+
+  // start_time_unix_nano is the last time when the aggregation value was reset
+  // to "zero". For some metric types this is ignored, see data types for more
+  // details.
+  //
+  // The aggregation value is over the time interval (start_time_unix_nano,
+  // time_unix_nano].
+  // 
+  // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+  // 1970.
+  //
+  // Value of 0 indicates that the timestamp is unspecified. In that case the
+  // timestamp may be decided by the backend.
+  fixed64 start_time_unix_nano = 2;
+
+  // time_unix_nano is the moment when this aggregation value was reported.
+  // 
+  // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+  // 1970.
+  fixed64 time_unix_nano = 3;
+
+  // count is the number of values in the population. Must be non-negative. This
+  // value must be equal to the sum of the "count" fields in buckets if a
+  // histogram is provided.
+  fixed64 count = 4;
+
+  // sum of the values in the population. If count is zero then this field
+  // must be zero. This value must be equal to the sum of the "sum" fields in
+  // buckets if a histogram is provided.
+  double sum = 5;
+
+  // bucket_counts is an optional field contains the count values of histogram
+  // for each bucket.
+  //
+  // The sum of the bucket_counts must equal the value in the count field.
+  //
+  // The number of elements in bucket_counts array must be by one greater than
+  // the number of elements in explicit_bounds array.
+  repeated fixed64 bucket_counts = 6;
+
+  // A histogram may optionally contain the distribution of the values in the population.
+  // In that case one of the option fields below and "buckets" field both must be defined.
+  // Otherwise all option fields and "buckets" field must be omitted in which case the
+  // distribution of values in the histogram is unknown and only the total count and sum are known.
+
+  // explicit_bounds is the only supported bucket option currently.
+  // TODO: Add more bucket options.
+
+  // explicit_bounds specifies buckets with explicitly defined bounds for values.
+  // The bucket boundaries are described by "bounds" field.
+  //
+  // This defines size(bounds) + 1 (= N) buckets. The boundaries for bucket
+  // at index i are:
+  //
+  // (-infinity, bounds[i]) for i == 0
+  // [bounds[i-1], bounds[i]) for 0 < i < N-1
+  // [bounds[i], +infinity) for i == N-1
+  // The values in bounds array must be strictly increasing.
+  //
+  // Note: only [a, b) intervals are currently supported for each bucket except the first one.
+  // If we decide to also support (a, b] intervals we should add support for these by defining
+  // a boolean value which decides what type of intervals to use.
+  repeated double explicit_bounds = 7;
+
+  // (Optional) List of exemplars collected from
+  // measurements that were used to form the data point
+  repeated DoubleExemplar exemplars = 8;
+}
+
+// DoubleSummaryDataPoint is a single data point in a timeseries that describes the
+// time-varying values of a Summary metric.
+message DoubleSummaryDataPoint {
+  // The set of labels that uniquely identify this timeseries.
+  repeated opentelemetry.proto.common.v1.StringKeyValue labels = 1;
+
+  // start_time_unix_nano is the last time when the aggregation value was reset
+  // to "zero". For some metric types this is ignored, see data types for more
+  // details.
+  //
+  // The aggregation value is over the time interval (start_time_unix_nano,
+  // time_unix_nano].
+  //
+  // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+  // 1970.
+  //
+  // Value of 0 indicates that the timestamp is unspecified. In that case the
+  // timestamp may be decided by the backend.
+  fixed64 start_time_unix_nano = 2;
+
+  // time_unix_nano is the moment when this aggregation value was reported.
+  //
+  // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+  // 1970.
+  fixed64 time_unix_nano = 3;
+
+  // count is the number of values in the population. Must be non-negative.
+  fixed64 count = 4;
+
+  // sum of the values in the population. If count is zero then this field
+  // must be zero.
+  double sum = 5;
+
+  // Represents the value at a given quantile of a distribution.
+  //
+  // To record Min and Max values following conventions are used:
+  // - The 1.0 quantile is equivalent to the maximum value observed.
+  // - The 0.0 quantile is equivalent to the minimum value observed.
+  //
+  // See the following issue for more context:
+  // https://github.com/open-telemetry/opentelemetry-proto/issues/125
+  message ValueAtQuantile {
+    // The quantile of a distribution. Must be in the interval
+    // [0.0, 1.0].
+    double quantile = 1;
+
+    // The value at the given quantile of a distribution.
+    double value = 2;
+  }
+
+  // (Optional) list of values at different quantiles of the distribution calculated
+  // from the current snapshot. The quantiles must be strictly increasing.
+  repeated ValueAtQuantile quantile_values = 6;
+}
+
+// A representation of an exemplar, which is a sample input int measurement.
+// Exemplars also hold information about the environment when the measurement
+// was recorded, for example the span and trace ID of the active span when the
+// exemplar was recorded.
+message IntExemplar {
+  // The set of labels that were filtered out by the aggregator, but recorded
+  // alongside the original measurement. Only labels that were filtered out
+  // by the aggregator should be included
+  repeated opentelemetry.proto.common.v1.StringKeyValue filtered_labels = 1;
+
+  // time_unix_nano is the exact time when this exemplar was recorded
+  //
+  // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+  // 1970.
+  fixed64 time_unix_nano = 2;
+
+  // Numerical int value of the measurement that was recorded.
+  sfixed64 value = 3;
+
+  // (Optional) Span ID of the exemplar trace.
+  // span_id may be missing if the measurement is not recorded inside a trace
+  // or if the trace is not sampled.
+  bytes span_id = 4;
+
+  // (Optional) Trace ID of the exemplar trace.
+  // trace_id may be missing if the measurement is not recorded inside a trace
+  // or if the trace is not sampled.
+  bytes trace_id = 5;
+}
+
+// A representation of an exemplar, which is a sample input double measurement.
+// Exemplars also hold information about the environment when the measurement
+// was recorded, for example the span and trace ID of the active span when the
+// exemplar was recorded.
+message DoubleExemplar {
+  // The set of labels that were filtered out by the aggregator, but recorded
+  // alongside the original measurement. Only labels that were filtered out
+  // by the aggregator should be included
+  repeated opentelemetry.proto.common.v1.StringKeyValue filtered_labels = 1;
+
+  // time_unix_nano is the exact time when this exemplar was recorded
+  //
+  // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
+  // 1970.
+  fixed64 time_unix_nano = 2;
+
+  // Numerical double value of the measurement that was recorded.
+  double value = 3;
+
+  // (Optional) Span ID of the exemplar trace.
+  // span_id may be missing if the measurement is not recorded inside a trace
+  // or if the trace is not sampled.
+  bytes span_id = 4;
+
+  // (Optional) Trace ID of the exemplar trace.
+  // trace_id may be missing if the measurement is not recorded inside a trace
+  // or if the trace is not sampled.
+  bytes trace_id = 5;
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/resource/v1/resource.proto b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/resource/v1/resource.proto
new file mode 100644
index 0000000000..88a735a746
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/proto/opentelemetry/proto/resource/v1/resource.proto
@@ -0,0 +1,34 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.resource.v1;
+
+import "opentelemetry/proto/common/v1/common.proto";
+
+option java_multiple_files = true;
+option java_package = "io.opentelemetry.proto.resource.firehose.v0_7";
+option java_outer_classname = "ResourceProto";
+option go_package = "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1";
+
+// Resource information.
+message Resource {
+  // Set of labels that describe the resource.
+  repeated opentelemetry.proto.common.v1.KeyValue attributes = 1;
+
+  // dropped_attributes_count is the number of dropped attributes. If the value is 0, then
+  // no attributes were dropped.
+  uint32 dropped_attributes_count = 2;
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/java/org/apache/skywalking/oap/server/receiver/aws/firehose/OtelMetricsConvertorTest.java b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/java/org/apache/skywalking/oap/server/receiver/aws/firehose/OtelMetricsConvertorTest.java
new file mode 100644
index 0000000000..1a97e9c9aa
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/java/org/apache/skywalking/oap/server/receiver/aws/firehose/OtelMetricsConvertorTest.java
@@ -0,0 +1,90 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.receiver.aws.firehose;
+
+import com.google.gson.Gson;
+import com.google.protobuf.util.JsonFormat;
+import io.opentelemetry.proto.collector.metrics.firehose.v0_7.ExportMetricsServiceRequest;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OtelMetricsConvertorTest {
+
+    @Test
+    public void test() throws IOException {
+        for (TestData testData : findTestData()) {
+            io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest request = convertSource(
+                testData.getSourceFile());
+            String str = JsonFormat.printer().print(request);
+            final Map convertedData = new Gson().fromJson(str, Map.class);
+            final Map expect = new Gson().fromJson(
+                new String(Files.readAllBytes(testData.getExpectFile().toPath())), Map.class);
+            Assert.assertEquals(
+                String.format("diff , %s -> %s", testData.getSourceFile(), testData.getExpectFile()),
+                expect,
+                convertedData
+            );
+            System.out.printf("test pass %s -> %s %n", testData.getSourceFile(), testData.getExpectFile());
+        }
+    }
+
+    private io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest convertSource(final File sourceFile) throws IOException {
+        String source = new String(Files.readAllBytes(sourceFile.toPath()));
+        final ExportMetricsServiceRequest.Builder builder = ExportMetricsServiceRequest.newBuilder();
+        JsonFormat.parser().merge(source, builder);
+        return OtelMetricsConvertor.convertExportMetricsRequest(
+            builder.build());
+    }
+
+    private List<TestData> findTestData() {
+        List<TestData> res = new ArrayList<>();
+        Path resourceDirectory = Paths.get("src", "test", "resources", "convertor-test-data");
+        final File[] subFiles = resourceDirectory.toFile().listFiles(File::isDirectory);
+        if (subFiles == null) {
+            return res;
+        }
+        for (File subFile : subFiles) {
+            File sourceFile = new File(subFile.getAbsolutePath(), "source.json");
+            File expectFile = new File(subFile.getAbsolutePath(), "expect.json");
+            res.add(new TestData(sourceFile, expectFile));
+        }
+        return res;
+    }
+
+    @Getter
+    @Setter
+    @AllArgsConstructor
+    private static class TestData {
+        // OTEL 0.7.0
+        private File sourceFile;
+        private File expectFile;
+    }
+
+}
+
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-1/expect.json b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-1/expect.json
new file mode 100644
index 0000000000..7d97bca7bf
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-1/expect.json
@@ -0,0 +1,132 @@
+{
+  "resourceMetrics": [
+    {
+      "resource": {
+        "attributes": [
+          {
+            "key": "cloud.provider",
+            "value": {
+              "stringValue": "aws"
+            }
+          },
+          {
+            "key": "cloud.account.id",
+            "value": {
+              "stringValue": "xxxxxxxx"
+            }
+          },
+          {
+            "key": "cloud.region",
+            "value": {
+              "stringValue": "ap-northeast-1"
+            }
+          },
+          {
+            "key": "aws.exporter.arn",
+            "value": {
+              "stringValue": "arn:aws:cloudwatch:ap-northeast-1:xxxxxxxx:metric-stream/CustomFull-CmHV2P"
+            }
+          }
+        ]
+      },
+      "scopeMetrics": [
+        {
+          "metrics": [
+            {
+              "name": "amazonaws.com/AWS/S3/4xxErrors",
+              "unit": "{Count}",
+              "summary": {
+                "dataPoints": [
+                  {
+                    "startTimeUnixNano": "1674547500000000000",
+                    "timeUnixNano": "1674547560000000000",
+                    "count": "1",
+                    "quantileValues": [
+                      {
+                      },
+                      {
+                        "quantile": 1.0
+                      }
+                    ],
+                    "attributes": [
+                      {
+                        "key": "Namespace",
+                        "value": {
+                          "stringValue": "AWS/S3"
+                        }
+                      },
+                      {
+                        "key": "MetricName",
+                        "value": {
+                          "stringValue": "4xxErrors"
+                        }
+                      },
+                      {
+                        "key": "BucketName",
+                        "value": {
+                          "stringValue": "skywalking"
+                        }
+                      },
+                      {
+                        "key": "FilterId",
+                        "value": {
+                          "stringValue": "test"
+                        }
+                      }
+                    ]
+                  }
+                ]
+              }
+            },
+            {
+              "name": "amazonaws.com/AWS/S3/5xxErrors",
+              "unit": "{Count}",
+              "summary": {
+                "dataPoints": [
+                  {
+                    "startTimeUnixNano": "1674547500000000000",
+                    "timeUnixNano": "1674547560000000000",
+                    "count": "1",
+                    "quantileValues": [
+                      {
+                      },
+                      {
+                        "quantile": 1.0
+                      }
+                    ],
+                    "attributes": [
+                      {
+                        "key": "Namespace",
+                        "value": {
+                          "stringValue": "AWS/S3"
+                        }
+                      },
+                      {
+                        "key": "MetricName",
+                        "value": {
+                          "stringValue": "5xxErrors"
+                        }
+                      },
+                      {
+                        "key": "BucketName",
+                        "value": {
+                          "stringValue": "skywalking"
+                        }
+                      },
+                      {
+                        "key": "FilterId",
+                        "value": {
+                          "stringValue": "test"
+                        }
+                      }
+                    ]
+                  }
+                ]
+              }
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-1/source.json b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-1/source.json
new file mode 100644
index 0000000000..4a454e6ba7
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-1/source.json
@@ -0,0 +1,116 @@
+{
+  "resourceMetrics": [
+    {
+      "resource": {
+        "attributes": [
+          {
+            "key": "cloud.provider",
+            "value": {
+              "stringValue": "aws"
+            }
+          },
+          {
+            "key": "cloud.account.id",
+            "value": {
+              "stringValue": "xxxxxxxx"
+            }
+          },
+          {
+            "key": "cloud.region",
+            "value": {
+              "stringValue": "ap-northeast-1"
+            }
+          },
+          {
+            "key": "aws.exporter.arn",
+            "value": {
+              "stringValue": "arn:aws:cloudwatch:ap-northeast-1:xxxxxxxx:metric-stream/CustomFull-CmHV2P"
+            }
+          }
+        ]
+      },
+      "instrumentationLibraryMetrics": [
+        {
+          "metrics": [
+            {
+              "name": "amazonaws.com/AWS/S3/4xxErrors",
+              "unit": "{Count}",
+              "doubleSummary": {
+                "dataPoints": [
+                  {
+                    "labels": [
+                      {
+                        "key": "Namespace",
+                        "value": "AWS/S3"
+                      },
+                      {
+                        "key": "MetricName",
+                        "value": "4xxErrors"
+                      },
+                      {
+                        "key": "BucketName",
+                        "value": "skywalking"
+                      },
+                      {
+                        "key": "FilterId",
+                        "value": "test"
+                      }
+                    ],
+                    "startTimeUnixNano": "1674547500000000000",
+                    "timeUnixNano": "1674547560000000000",
+                    "count": "1",
+                    "quantileValues": [
+                      {
+                      },
+                      {
+                        "quantile": 1.0
+                      }
+                    ]
+                  }
+                ]
+              }
+            },
+            {
+              "name": "amazonaws.com/AWS/S3/5xxErrors",
+              "unit": "{Count}",
+              "doubleSummary": {
+                "dataPoints": [
+                  {
+                    "labels": [
+                      {
+                        "key": "Namespace",
+                        "value": "AWS/S3"
+                      },
+                      {
+                        "key": "MetricName",
+                        "value": "5xxErrors"
+                      },
+                      {
+                        "key": "BucketName",
+                        "value": "skywalking"
+                      },
+                      {
+                        "key": "FilterId",
+                        "value": "test"
+                      }
+                    ],
+                    "startTimeUnixNano": "1674547500000000000",
+                    "timeUnixNano": "1674547560000000000",
+                    "count": "1",
+                    "quantileValues": [
+                      {
+                      },
+                      {
+                        "quantile": 1.0
+                      }
+                    ]
+                  }
+                ]
+              }
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-2/expect.json b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-2/expect.json
new file mode 100644
index 0000000000..cddc0dae9b
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-2/expect.json
@@ -0,0 +1,285 @@
+{
+  "resourceMetrics": [
+    {
+      "resource": {
+        "attributes": [
+          {
+            "key": "cloud.provider",
+            "value": {
+              "stringValue": "aws"
+            }
+          },
+          {
+            "key": "cloud.account.id",
+            "value": {
+              "stringValue": "xxxxxxxx"
+            }
+          },
+          {
+            "key": "cloud.region",
+            "value": {
+              "stringValue": "ap-northeast-1"
+            }
+          },
+          {
+            "key": "aws.exporter.arn",
+            "value": {
+              "stringValue": "arn:aws:cloudwatch:ap-northeast-1:xxxxxxxx:metric-stream/CustomFull-CmHV2P"
+            }
+          }
+        ]
+      },
+      "scopeMetrics": [
+        {
+          "metrics": [
+            {
+              "name": "amazonaws.com/AWS/S3/BytesDownloaded",
+              "unit": "By",
+              "summary": {
+                "dataPoints": [
+                  {
+                    "startTimeUnixNano": "1674547500000000000",
+                    "timeUnixNano": "1674547560000000000",
+                    "count": "1",
+                    "sum": 166199.0,
+                    "quantileValues": [
+                      {
+                        "value": 166199.0
+                      },
+                      {
+                        "quantile": 1.0,
+                        "value": 166199.0
+                      }
+                    ],
+                    "attributes": [
+                      {
+                        "key": "Namespace",
+                        "value": {
+                          "stringValue": "AWS/S3"
+                        }
+                      },
+                      {
+                        "key": "MetricName",
+                        "value": {
+                          "stringValue": "BytesDownloaded"
+                        }
+                      },
+                      {
+                        "key": "BucketName",
+                        "value": {
+                          "stringValue": "skywalking"
+                        }
+                      },
+                      {
+                        "key": "FilterId",
+                        "value": {
+                          "stringValue": "test"
+                        }
+                      }
+                    ]
+                  }
+                ]
+              }
+            },
+            {
+              "name": "amazonaws.com/AWS/S3/TotalRequestLatency",
+              "unit": "ms",
+              "summary": {
+                "dataPoints": [
+                  {
+                    "startTimeUnixNano": "1674547500000000000",
+                    "timeUnixNano": "1674547560000000000",
+                    "count": "1",
+                    "sum": 66.0,
+                    "quantileValues": [
+                      {
+                        "value": 66.0
+                      },
+                      {
+                        "quantile": 1.0,
+                        "value": 66.0
+                      }
+                    ],
+                    "attributes": [
+                      {
+                        "key": "Namespace",
+                        "value": {
+                          "stringValue": "AWS/S3"
+                        }
+                      },
+                      {
+                        "key": "MetricName",
+                        "value": {
+                          "stringValue": "TotalRequestLatency"
+                        }
+                      },
+                      {
+                        "key": "BucketName",
+                        "value": {
+                          "stringValue": "skywalking"
+                        }
+                      },
+                      {
+                        "key": "FilterId",
+                        "value": {
+                          "stringValue": "test"
+                        }
+                      }
+                    ]
+                  }
+                ]
+              }
+            },
+            {
+              "name": "amazonaws.com/AWS/S3/FirstByteLatency",
+              "unit": "ms",
+              "summary": {
+                "dataPoints": [
+                  {
+                    "startTimeUnixNano": "1674547500000000000",
+                    "timeUnixNano": "1674547560000000000",
+                    "count": "1",
+                    "sum": 64.0,
+                    "quantileValues": [
+                      {
+                        "value": 64.0
+                      },
+                      {
+                        "quantile": 1.0,
+                        "value": 64.0
+                      }
+                    ],
+                    "attributes": [
+                      {
+                        "key": "Namespace",
+                        "value": {
+                          "stringValue": "AWS/S3"
+                        }
+                      },
+                      {
+                        "key": "MetricName",
+                        "value": {
+                          "stringValue": "FirstByteLatency"
+                        }
+                      },
+                      {
+                        "key": "BucketName",
+                        "value": {
+                          "stringValue": "skywalking"
+                        }
+                      },
+                      {
+                        "key": "FilterId",
+                        "value": {
+                          "stringValue": "test"
+                        }
+                      }
+                    ]
+                  }
+                ]
+              }
+            },
+            {
+              "name": "amazonaws.com/AWS/S3/GetRequests",
+              "unit": "{Count}",
+              "summary": {
+                "dataPoints": [
+                  {
+                    "startTimeUnixNano": "1674547500000000000",
+                    "timeUnixNano": "1674547560000000000",
+                    "count": "1",
+                    "sum": 1.0,
+                    "quantileValues": [
+                      {
+                        "value": 1.0
+                      },
+                      {
+                        "quantile": 1.0,
+                        "value": 1.0
+                      }
+                    ],
+                    "attributes": [
+                      {
+                        "key": "Namespace",
+                        "value": {
+                          "stringValue": "AWS/S3"
+                        }
+                      },
+                      {
+                        "key": "MetricName",
+                        "value": {
+                          "stringValue": "GetRequests"
+                        }
+                      },
+                      {
+                        "key": "BucketName",
+                        "value": {
+                          "stringValue": "skywalking"
+                        }
+                      },
+                      {
+                        "key": "FilterId",
+                        "value": {
+                          "stringValue": "test"
+                        }
+                      }
+                    ]
+                  }
+                ]
+              }
+            },
+            {
+              "name": "amazonaws.com/AWS/S3/AllRequests",
+              "unit": "{Count}",
+              "summary": {
+                "dataPoints": [
+                  {
+                    "startTimeUnixNano": "1674547500000000000",
+                    "timeUnixNano": "1674547560000000000",
+                    "count": "1",
+                    "sum": 1.0,
+                    "quantileValues": [
+                      {
+                        "value": 1.0
+                      },
+                      {
+                        "quantile": 1.0,
+                        "value": 1.0
+                      }
+                    ],
+                    "attributes": [
+                      {
+                        "key": "Namespace",
+                        "value": {
+                          "stringValue": "AWS/S3"
+                        }
+                      },
+                      {
+                        "key": "MetricName",
+                        "value": {
+                          "stringValue": "AllRequests"
+                        }
+                      },
+                      {
+                        "key": "BucketName",
+                        "value": {
+                          "stringValue": "skywalking"
+                        }
+                      },
+                      {
+                        "key": "FilterId",
+                        "value": {
+                          "stringValue": "test"
+                        }
+                      }
+                    ]
+                  }
+                ]
+              }
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-2/source.json b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-2/source.json
new file mode 100644
index 0000000000..acaae8a04d
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/test/resources/convertor-test-data/s3-data-2/source.json
@@ -0,0 +1,245 @@
+{
+  "resourceMetrics": [
+    {
+      "resource": {
+        "attributes": [
+          {
+            "key": "cloud.provider",
+            "value": {
+              "stringValue": "aws"
+            }
+          },
+          {
+            "key": "cloud.account.id",
+            "value": {
+              "stringValue": "xxxxxxxx"
+            }
+          },
+          {
+            "key": "cloud.region",
+            "value": {
+              "stringValue": "ap-northeast-1"
+            }
+          },
+          {
+            "key": "aws.exporter.arn",
+            "value": {
+              "stringValue": "arn:aws:cloudwatch:ap-northeast-1:xxxxxxxx:metric-stream/CustomFull-CmHV2P"
+            }
+          }
+        ]
+      },
+      "instrumentationLibraryMetrics": [
+        {
+          "metrics": [
+            {
+              "name": "amazonaws.com/AWS/S3/BytesDownloaded",
+              "unit": "By",
+              "doubleSummary": {
+                "dataPoints": [
+                  {
+                    "labels": [
+                      {
+                        "key": "Namespace",
+                        "value": "AWS/S3"
+                      },
+                      {
+                        "key": "MetricName",
+                        "value": "BytesDownloaded"
+                      },
+                      {
+                        "key": "BucketName",
+                        "value": "skywalking"
+                      },
+                      {
+                        "key": "FilterId",
+                        "value": "test"
+                      }
+                    ],
+                    "startTimeUnixNano": "1674547500000000000",
+                    "timeUnixNano": "1674547560000000000",
+                    "count": "1",
+                    "sum": 166199.0,
+                    "quantileValues": [
+                      {
+                        "value": 166199.0
+                      },
+                      {
+                        "quantile": 1.0,
+                        "value": 166199.0
+                      }
+                    ]
+                  }
+                ]
+              }
+            },
+            {
+              "name": "amazonaws.com/AWS/S3/TotalRequestLatency",
+              "unit": "ms",
+              "doubleSummary": {
+                "dataPoints": [
+                  {
+                    "labels": [
+                      {
+                        "key": "Namespace",
+                        "value": "AWS/S3"
+                      },
+                      {
+                        "key": "MetricName",
+                        "value": "TotalRequestLatency"
+                      },
+                      {
+                        "key": "BucketName",
+                        "value": "skywalking"
+                      },
+                      {
+                        "key": "FilterId",
+                        "value": "test"
+                      }
+                    ],
+                    "startTimeUnixNano": "1674547500000000000",
+                    "timeUnixNano": "1674547560000000000",
+                    "count": "1",
+                    "sum": 66.0,
+                    "quantileValues": [
+                      {
+                        "value": 66.0
+                      },
+                      {
+                        "quantile": 1.0,
+                        "value": 66.0
+                      }
+                    ]
+                  }
+                ]
+              }
+            },
+            {
+              "name": "amazonaws.com/AWS/S3/FirstByteLatency",
+              "unit": "ms",
+              "doubleSummary": {
+                "dataPoints": [
+                  {
+                    "labels": [
+                      {
+                        "key": "Namespace",
+                        "value": "AWS/S3"
+                      },
+                      {
+                        "key": "MetricName",
+                        "value": "FirstByteLatency"
+                      },
+                      {
+                        "key": "BucketName",
+                        "value": "skywalking"
+                      },
+                      {
+                        "key": "FilterId",
+                        "value": "test"
+                      }
+                    ],
+                    "startTimeUnixNano": "1674547500000000000",
+                    "timeUnixNano": "1674547560000000000",
+                    "count": "1",
+                    "sum": 64.0,
+                    "quantileValues": [
+                      {
+                        "value": 64.0
+                      },
+                      {
+                        "quantile": 1.0,
+                        "value": 64.0
+                      }
+                    ]
+                  }
+                ]
+              }
+            },
+            {
+              "name": "amazonaws.com/AWS/S3/GetRequests",
+              "unit": "{Count}",
+              "doubleSummary": {
+                "dataPoints": [
+                  {
+                    "labels": [
+                      {
+                        "key": "Namespace",
+                        "value": "AWS/S3"
+                      },
+                      {
+                        "key": "MetricName",
+                        "value": "GetRequests"
+                      },
+                      {
+                        "key": "BucketName",
+                        "value": "skywalking"
+                      },
+                      {
+                        "key": "FilterId",
+                        "value": "test"
+                      }
+                    ],
+                    "startTimeUnixNano": "1674547500000000000",
+                    "timeUnixNano": "1674547560000000000",
+                    "count": "1",
+                    "sum": 1.0,
+                    "quantileValues": [
+                      {
+                        "value": 1.0
+                      },
+                      {
+                        "quantile": 1.0,
+                        "value": 1.0
+                      }
+                    ]
+                  }
+                ]
+              }
+            },
+            {
+              "name": "amazonaws.com/AWS/S3/AllRequests",
+              "unit": "{Count}",
+              "doubleSummary": {
+                "dataPoints": [
+                  {
+                    "labels": [
+                      {
+                        "key": "Namespace",
+                        "value": "AWS/S3"
+                      },
+                      {
+                        "key": "MetricName",
+                        "value": "AllRequests"
+                      },
+                      {
+                        "key": "BucketName",
+                        "value": "skywalking"
+                      },
+                      {
+                        "key": "FilterId",
+                        "value": "test"
+                      }
+                    ],
+                    "startTimeUnixNano": "1674547500000000000",
+                    "timeUnixNano": "1674547560000000000",
+                    "count": "1",
+                    "sum": 1.0,
+                    "quantileValues": [
+                      {
+                        "value": 1.0
+                      },
+                      {
+                        "quantile": 1.0,
+                        "value": 1.0
+                      }
+                    ]
+                  }
+                ]
+              }
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}