You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/06/08 01:19:55 UTC
[skywalking] branch master updated: Ingest prometheus metrics
through meter system (#4783)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 4f0f39f Ingest prometheus metrics through meter system (#4783)
4f0f39f is described below
commit 4f0f39ffccdc9b41049903cc540b8904f7c9728e
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Jun 8 09:19:34 2020 +0800
Ingest prometheus metrics through meter system (#4783)
---
apm-dist-es7/src/main/assembly/binary-es7.xml | 1 +
apm-dist/src/main/assembly/binary.xml | 1 +
dist-material/release-docs/LICENSE | 2 +
dist-material/release-docs/NOTICE | 4 +-
docker/oap/docker-entrypoint.sh | 5 -
docs/en/setup/backend/backend-fetcher.md | 76 +++-
docs/en/setup/backend/backend-telemetry.md | 34 +-
oap-server/pom.xml | 12 +
oap-server/server-bootstrap/pom.xml | 11 +-
.../src/main/resources/application.yml | 8 -
.../main/resources/fetcher-prom-rules/self.yaml | 170 ++++++++
.../server-bootstrap/src/main/resources/log4j2.xml | 2 +-
.../server/core/analysis/meter/MeterEntity.java | 2 +
.../server/core/analysis/meter/MeterSystem.java | 37 ++
.../core/analysis/meter/function/AvgFunction.java | 24 +-
...gramFunction.java => AvgHistogramFunction.java} | 101 +++--
...on.java => AvgHistogramPercentileFunction.java} | 115 ++++--
.../analysis/meter/function/HistogramFunction.java | 22 +-
.../meter/function/PercentileFunction.java | 24 +-
.../analysis/worker/MetricsPersistentWorker.java | 4 +-
oap-server/server-fetcher-plugin/pom.xml | 7 +
.../prometheus-fetcher-plugin/pom.xml | 8 +
.../provider/PrometheusFetcherConfig.java | 2 +
.../provider/PrometheusFetcherProvider.java | 343 +++++++++++++----
.../fetcher/prometheus/provider/counter/ID.java} | 26 +-
.../prometheus/provider/counter/Window.java | 89 +++++
.../MetricSource.java} | 22 +-
.../Operation.java} | 22 +-
.../prometheus/provider/rule/CounterFunction.java} | 10 +-
.../MetricsRule.java} | 21 +-
.../PrometheusMetric.java} | 17 +-
.../Relabel.java} | 17 +-
.../Rule.java} | 20 +-
.../fetcher/prometheus/provider/rule/Rules.java | 60 +++
.../StaticConfig.java} | 17 +-
.../prometheus/provider/rule/RulesTest.java} | 23 +-
.../resources/fetcher-prom-rules/localhost.yaml} | 27 +-
.../src/test/resources/log4j2.xml} | 33 +-
oap-server/server-library/library-util/pom.xml | 4 +
.../oap/server/library/util/ResourceUtils.java | 17 +-
.../server/library/util/prometheus/Parser.java} | 11 +-
.../server/library/util/prometheus/Parsers.java} | 21 +-
.../library/util/prometheus/metrics/Counter.java} | 32 +-
.../library/util/prometheus/metrics/Gauge.java} | 32 +-
.../library/util/prometheus/metrics/Histogram.java | 61 +++
.../library/util/prometheus/metrics/Metric.java} | 29 +-
.../util/prometheus/metrics/MetricFamily.java | 137 +++++++
.../util/prometheus/metrics/MetricType.java} | 12 +-
.../library/util/prometheus/metrics/Summary.java | 56 +++
.../library/util/prometheus/parser/Context.java | 153 ++++++++
.../library/util/prometheus/parser/TextParser.java | 128 +++++++
.../util/prometheus/parser/sample/Context.java} | 16 +-
.../util/prometheus/parser/sample/State.java | 175 +++++++++
.../util/prometheus/parser/sample/TextSample.java | 49 +++
.../util/prometheus/parser/TextParserTest.java | 109 ++++++
.../src/test/resources/testdata/prometheus.txt | 36 ++
oap-server/server-receiver-plugin/pom.xml | 1 -
.../skywalking-so11y-receiver-plugin/pom.xml | 39 --
.../so11y/So11yReceiverModuleProvider.java | 426 ---------------------
...alking.oap.server.library.module.ModuleProvider | 19 -
oap-server/server-telemetry/pom.xml | 1 -
.../telemetry/so11y/So11yMetricsCollector.java | 50 ---
.../telemetry/so11y/So11yMetricsCreator.java | 48 ---
.../telemetry/so11y/So11yTelemetryProvider.java | 85 ----
...alking.oap.server.library.module.ModuleProvider | 20 -
.../known-oap-backend-dependencies-es7.txt | 2 +
.../known-oap-backend-dependencies.txt | 2 +
67 files changed, 2123 insertions(+), 1067 deletions(-)
diff --git a/apm-dist-es7/src/main/assembly/binary-es7.xml b/apm-dist-es7/src/main/assembly/binary-es7.xml
index 2a27119..880618f 100644
--- a/apm-dist-es7/src/main/assembly/binary-es7.xml
+++ b/apm-dist-es7/src/main/assembly/binary-es7.xml
@@ -56,6 +56,7 @@
<include>oal/java-agent.oal</include>
<include>oal/dotnet-agent.oal</include>
<include>oal/envoy.oal</include>
+ <include>fetcher-prom-rules/self.yaml</include>
</includes>
<outputDirectory>/config</outputDirectory>
</fileSet>
diff --git a/apm-dist/src/main/assembly/binary.xml b/apm-dist/src/main/assembly/binary.xml
index 00dc189..c743553 100644
--- a/apm-dist/src/main/assembly/binary.xml
+++ b/apm-dist/src/main/assembly/binary.xml
@@ -56,6 +56,7 @@
<include>oal/java-agent.oal</include>
<include>oal/dotnet-agent.oal</include>
<include>oal/envoy.oal</include>
+ <include>fetcher-prom-rules/self.yaml</include>
</includes>
<outputDirectory>/config</outputDirectory>
</fileSet>
diff --git a/dist-material/release-docs/LICENSE b/dist-material/release-docs/LICENSE
index 60a18f8..91f8873 100755
--- a/dist-material/release-docs/LICENSE
+++ b/dist-material/release-docs/LICENSE
@@ -258,6 +258,7 @@ The text of each license is the standard Apache 2.0 license.
Apache: commons-dbcp 1.4: https://github.com/apache/commons-dbcp, Apache 2.0
Apache: commons-pool 2.4.2: https://github.com/apache/commons-pool, Apache 2.0
Apache: commons-lang 3.6: https://github.com/apache/commons-lang, Apache 2.0
+ Apache: commons-text 1.8: https://github.com/apache/commons-text, Apache 2.0
Apache: lucene 6.6.0: https://github.com/apache/lucene-solr/tree/master/lucene, Apache 2.0
Apache: httpasyncclient 4.1.2: https://github.com/apache/httpasyncclient/tree/4.1.2, Apache 2.0
Apache: log4j 1.2.16: http://logging.apache.org/log4j/1.2/, Apache 2.0
@@ -326,6 +327,7 @@ The text of each license is the standard Apache 2.0 license.
moshi 1.5.0: https://github.com/square/moshi, Apache 2.0
logging-interceptor 3.13.1: https://github.com/square/okhttp/tree/master/okhttp-logging-interceptor, Apache 2.0
msgpack-core 0.8.16: https://github.com/msgpack/msgpack-java, Apache 2.0
+ vavr 0.10.3: https://github.com/vavr-io/vavr, Apache 2.0
========================================================================
MIT licenses
diff --git a/dist-material/release-docs/NOTICE b/dist-material/release-docs/NOTICE
index f762cd1..d3ecdc6 100755
--- a/dist-material/release-docs/NOTICE
+++ b/dist-material/release-docs/NOTICE
@@ -876,4 +876,6 @@ MessagePackage Notice
This product includes the software developed by third-party:
* Google Guava https://code.google.com/p/guava-libraries/ (APL2)
- * sbt-extras: https://github.com/paulp/sbt-extras (BSD) (LICENSE.sbt-extras.txt)
\ No newline at end of file
+ * sbt-extras: https://github.com/paulp/sbt-extras (BSD) (LICENSE.sbt-extras.txt)
+
+========================================================================
diff --git a/docker/oap/docker-entrypoint.sh b/docker/oap/docker-entrypoint.sh
index 7007d46..9ec6d2f 100755
--- a/docker/oap/docker-entrypoint.sh
+++ b/docker/oap/docker-entrypoint.sh
@@ -19,11 +19,6 @@ set -e
echo "[Entrypoint] Apache SkyWalking Docker Image"
-if [ "$SW_TELEMETRY" = "so11y" ]; then
- export SW_RECEIVER_SO11Y=default
- echo "Set SW_RECEIVER_SO11Y to ${SW_RECEIVER_SO11Y}"
-fi
-
EXT_LIB_DIR=/skywalking/ext-libs
EXT_CONFIG_DIR=/skywalking/ext-config
diff --git a/docs/en/setup/backend/backend-fetcher.md b/docs/en/setup/backend/backend-fetcher.md
index f3a5731..ab9c33e 100644
--- a/docs/en/setup/backend/backend-fetcher.md
+++ b/docs/en/setup/backend/backend-fetcher.md
@@ -10,4 +10,78 @@ prometheus-fetcher:
active: ${SW_PROMETHEUS_FETCHER_ACTIVE:false}
```
-TODO: More detail should be added when this fetcher provided.
\ No newline at end of file
+### Configuration file
+Prometheus fetcher is configured via a configuration file. The configuration file defines everything related to fetching
+ services and their instances, as well as which rule files to load.
+
+OAP can load the configuration at bootstrap. If the new configuration is not well-formed, OAP fails to start up. The files
+are located at `$CLASSPATH/fetcher-prom-rules`.
+
+The file is written in YAML format, defined by the scheme described below. Brackets indicate that a parameter is optional.
+
+A full example can be found [here](../../../../oap-server/server-bootstrap/src/main/resources/fetcher-prom-rules/self.yaml)
+
+Generic placeholders are defined as follows:
+
+ * `<duration>`: a duration This will parse a textual representation of a duration. The formats accepted are based on
+ the ISO-8601 duration format `PnDTnHnMn.nS` with days considered to be exactly 24 hours.
+ * `<labelname>`: a string matching the regular expression [a-zA-Z_][a-zA-Z0-9_]*
+ * `<labelvalue>`: a string of unicode characters
+ * `<host>`: a valid string consisting of a hostname or IP followed by an optional port number
+ * `<path>`: a valid URL path
+ * `<string>`: a regular string
+
+```yaml
+# How frequently to fetch targets.
+fetcherInterval: <duration>
+# Per-fetch timeout when fetching this target.
+fetcherTimeout: <duration>
+# The HTTP resource path on which to fetch metrics from targets.
+metricsPath: <path>
+#Statically configured targets.
+staticConfig:
+ # The targets specified by the static config.
+ targets:
+ [ - <host> ]
+ # Labels assigned to all metrics fetched from the targets.
+ labels:
+ [ <labelname>: <labelvalue> ... ]
+# Metrics rule allow you to recompute queries.
+metricsRules:
+ [ - <metric_rules> ]
+```
+
+#### <metric_rules>
+
+```yaml
+# The name of rule, which combinates with a prefix 'meter_' as the index/table name in storage.
+name: <string>
+# Scope should be one of SERVICE, INSTANCE and ENDPOINT.
+scope: <string>
+# The transformation operation from prometheus metrics to skywalking ones.
+operation: <operation>
+# The prometheus sources of the transformation operation.
+sources:
+ # The prometheus metric family name
+ <string>:
+ # Function for counter, one of INCREASE, RATE, and IRATE.
+ [counterFunction: <string> ]
+ # The range of a counterFunction.
+ [range: <duration>]
+ # The percentile rank of percentile operation
+ [percentiles: [<rank>,...]]
+ # Relabel prometheus labels to skywalking dimensions.
+ relabel:
+ service: [<labelname>, ...]
+ [instance: [<labelname>, ...]]
+ [endpoint: [<labelname>, ...]]
+```
+
+#### <operation>
+
+The available operations are `avg`, `avgHistogram` and `avgHistogramPercentile`. The `avg` and `avgXXX` mean to average
+the raw fetched metrics or high rate metrics into low rate metrics. The process is the extension of skywalking downsampling,
+that adds the procedure from raw data to minute rate.
+
+When you specify `avgHistogram` and `avgHistogramPercentile`, the source should be the type of `histogram`. A counterFunction
+is also needed due to the `bucket`, `sum` and `count` of histogram are counters.
diff --git a/docs/en/setup/backend/backend-telemetry.md b/docs/en/setup/backend/backend-telemetry.md
index 142d984..dd6e9dc 100644
--- a/docs/en/setup/backend/backend-telemetry.md
+++ b/docs/en/setup/backend/backend-telemetry.md
@@ -8,10 +8,6 @@ telemetry:
prometheus:
host: ${SW_TELEMETRY_PROMETHEUS_HOST:0.0.0.0}
port: ${SW_TELEMETRY_PROMETHEUS_PORT:1234}
- so11y:
- prometheusExporterEnabled: ${SW_TELEMETRY_SO11Y_PROMETHEUS_ENABLED:true}
- prometheusExporterHost: ${SW_TELEMETRY_PROMETHEUS_HOST:0.0.0.0}
- prometheusExporterPort: ${SW_TELEMETRY_PROMETHEUS_PORT:1234}
```
but you can set one of `prometheus` or `so11y` to enable them, for more information, refer to the details below.
@@ -45,26 +41,24 @@ Provide the grafana dashboard settings. Check [SkyWalking Telemetry dashboard](g
SkyWalking supports to collect telemetry data into OAP backend directly. Users could check them out through UI or
GraphQL API then.
-Adding following configuration to enable `so11y`(self-observability) related modules.
+Adding following configuration to enable self-observability related modules.
+1. Setting up prometheus telemetry.
```yaml
-receiver-so11y:
- selector: ${SW_RECEIVER_SO11Y:default}
- default:
telemetry:
- selector: ${SW_TELEMETRY:so11y}
- # ... other configurations
-```
+ selector: ${SW_TELEMETRY:prometheus}
+ prometheus:
+ host: 127.0.0.1
+ port: 1543
+```.
-Another example represents how to combine `promethues` and `so11y`. Adding some items in `so11y` to make it happen.
+2. Setting up prometheus fetcher
```yaml
-telemetry:
- selector: ${SW_TELEMETRY:so11y}
- so11y:
- prometheusExporterEnabled: true
- prometheusExporterHost: 0.0.0.0
- prometheusExporterPort: 1234
-```
+prometheus-fetcher:
+ selector: ${SW_PROMETHEUS_FETCHER:default}
+ default:
+ active: ${SW_PROMETHEUS_FETCHER_ACTIVE:true}
+```
-Then prometheus exporter is listening on `0.0.0.0:1234`.
+3. Make sure `config/fetcher-prom-rules/self.yaml` exists.
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 21bca9b..4d9defd 100755
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -75,6 +75,7 @@
<jackson-databind.version>2.9.5</jackson-databind.version>
<commons-codec.version>1.11</commons-codec.version>
<commons-lang3.version>3.7</commons-lang3.version>
+ <commons-text.version>1.4</commons-text.version>
<simpleclient.version>0.6.0</simpleclient.version>
<apollo.version>1.4.0</apollo.version>
<maven-docker-plugin.version>0.30.0</maven-docker-plugin.version>
@@ -87,6 +88,7 @@
<antlr.version>4.7.1</antlr.version>
<freemarker.version>2.3.28</freemarker.version>
<javaassist.version>3.25.0-GA</javaassist.version>
+ <vavr.version>0.10.3</vavr.version>
<zookeeper.image.version>3.5</zookeeper.image.version>
<protobuf-java-util.version>3.11.4</protobuf-java-util.version>
@@ -494,6 +496,16 @@
<artifactId>javassist</artifactId>
<version>${javaassist.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-text</artifactId>
+ <version>${commons-text.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.vavr</groupId>
+ <artifactId>vavr</artifactId>
+ <version>${vavr.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>
diff --git a/oap-server/server-bootstrap/pom.xml b/oap-server/server-bootstrap/pom.xml
index 347f5ba..4851090 100644
--- a/oap-server/server-bootstrap/pom.xml
+++ b/oap-server/server-bootstrap/pom.xml
@@ -109,11 +109,6 @@
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
- <artifactId>skywalking-so11y-receiver-plugin</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-profile-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
@@ -161,11 +156,6 @@
<artifactId>telemetry-prometheus</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>telemetry-so11y</artifactId>
- <version>${project.version}</version>
- </dependency>
<!-- exporter -->
<dependency>
@@ -238,6 +228,7 @@
<exclude>oal/java-agent.oal</exclude>
<exclude>oal/dotnet-agent.oal</exclude>
<exclude>oal/envoy.oal</exclude>
+ <exclude>fetcher-prom-rules/</exclude>
</excludes>
</configuration>
</plugin>
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index 8624586..3761bef 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -225,14 +225,6 @@ telemetry:
prometheus:
host: ${SW_TELEMETRY_PROMETHEUS_HOST:0.0.0.0}
port: ${SW_TELEMETRY_PROMETHEUS_PORT:1234}
- so11y:
- prometheusExporterEnabled: ${SW_TELEMETRY_SO11Y_PROMETHEUS_ENABLED:true}
- prometheusExporterHost: ${SW_TELEMETRY_PROMETHEUS_HOST:0.0.0.0}
- prometheusExporterPort: ${SW_TELEMETRY_PROMETHEUS_PORT:1234}
-
-receiver-so11y:
- selector: ${SW_RECEIVER_SO11Y:-}
- default:
configuration:
selector: ${SW_CONFIGURATION:none}
diff --git a/oap-server/server-bootstrap/src/main/resources/fetcher-prom-rules/self.yaml b/oap-server/server-bootstrap/src/main/resources/fetcher-prom-rules/self.yaml
new file mode 100644
index 0000000..76cfe62
--- /dev/null
+++ b/oap-server/server-bootstrap/src/main/resources/fetcher-prom-rules/self.yaml
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This will parse a textual representation of a duration. The formats
+# accepted are based on the ISO-8601 duration format {@code PnDTnHnMn.nS}
+# with days considered to be exactly 24 hours.
+# <p>
+# Examples:
+# <pre>
+# "PT20.345S" -- parses as "20.345 seconds"
+# "PT15M" -- parses as "15 minutes" (where a minute is 60 seconds)
+# "PT10H" -- parses as "10 hours" (where an hour is 3600 seconds)
+# "P2D" -- parses as "2 days" (where a day is 24 hours or 86400 seconds)
+# "P2DT3H4M" -- parses as "2 days, 3 hours and 4 minutes"
+# "P-6H3M" -- parses as "-6 hours and +3 minutes"
+# "-P6H3M" -- parses as "-6 hours and -3 minutes"
+# "-P-6H+3M" -- parses as "+6 hours and -3 minutes"
+# </pre>
+fetcherInterval: PT15S
+fetcherTimeout: PT10S
+metricsPath: /metrics
+staticConfig:
+ # targets will be labeled as "instance"
+ targets:
+ - localhost:1234
+ labels:
+ service: oap-server
+metricsRules:
+ - name: instance_cpu_percentage
+ scope: SERVICE_INSTANCE
+ operation: avg
+ sources:
+ process_cpu_seconds_total:
+ counterFunction: RATE
+ range: PT1M
+ scale: 2
+ relabel:
+ service:
+ - service
+ instance:
+ - instance
+ - name: instance_jvm_memory_bytes_used
+ scope: SERVICE_INSTANCE
+ operation: avg
+ sources:
+ jvm_memory_bytes_used:
+ relabel:
+ service:
+ - service
+ instance:
+ - instance
+ - name: instance_jvm_gc_collection_seconds
+ scope: SERVICE_INSTANCE
+ operation: avg
+ sources:
+ jvm_gc_collection_seconds:
+ relabel:
+ service:
+ - service
+ instance:
+ - instance
+
+ - name: instance_persistence_timer_bulk_execute_latency_heatmap
+ scope: SERVICE_INSTANCE
+ operation: avgHistogram
+ sources:
+ persistence_timer_bulk_execute_latency:
+ counterFunction: INCREASE
+ range: PT5M
+ relabel:
+ service:
+ - service
+ instance:
+ - instance
+ - name: instance_persistence_timer_bulk_execute_latency_percentile
+ scope: SERVICE_INSTANCE
+ operation: avgHistogramPercentile
+ percentiles: [50, 70, 90, 99]
+ sources:
+ persistence_timer_bulk_execute_latency:
+ counterFunction: INCREASE
+ range: PT5M
+ relabel:
+ service:
+ - service
+ instance:
+ - instance
+ - name: instance_persistence_timer_bulk_execute_latency
+ scope: SERVICE_INSTANCE
+ operation: avg
+ sources:
+ persistence_timer_bulk_execute_latency:
+ relabel:
+ service:
+ - service
+ instance:
+ - instance
+ - name: instance_persistence_timer_bulk_prepare_latency
+ scope: SERVICE_INSTANCE
+ operation: avg
+ sources:
+ persistence_timer_bulk_prepare_latency:
+ relabel:
+ service:
+ - service
+ instance:
+ - instance
+
+ - name: instance_persistence_timer_bulk_error_count
+ scope: SERVICE_INSTANCE
+ operation: avg
+ sources:
+ persistence_timer_bulk_error_count:
+ counterFunction: INCREASE
+ range: PT1M
+ relabel:
+ service:
+ - service
+ instance:
+ - instance
+ - name: instance_persistence_timer_bulk_execute_latency_count
+ scope: SERVICE_INSTANCE
+ operation: avg
+ sources:
+ persistence_timer_bulk_execute_latency_count:
+ counterFunction: INCREASE
+ range: PT1M
+ relabel:
+ service:
+ - service
+ instance:
+ - instance
+ - name: instance_persistence_timer_bulk_prepare_latency_count
+ scope: SERVICE_INSTANCE
+ operation: avg
+ sources:
+ persistence_timer_bulk_prepare_latency_count:
+ counterFunction: INCREASE
+ range: PT1M
+ relabel:
+ service:
+ - service
+ instance:
+ - instance
+ - name: endpoint_metrics_aggregation
+ scope: ENDPOINT
+ operation: avg
+ sources:
+ metrics_aggregation:
+ counterFunction: INCREASE
+ range: PT1M
+ relabel:
+ service:
+ - service
+ endpoint:
+ - dimensionality
+ - level
+
diff --git a/oap-server/server-bootstrap/src/main/resources/log4j2.xml b/oap-server/server-bootstrap/src/main/resources/log4j2.xml
index 1f3ed39..cfb8240 100644
--- a/oap-server/server-bootstrap/src/main/resources/log4j2.xml
+++ b/oap-server/server-bootstrap/src/main/resources/log4j2.xml
@@ -33,9 +33,9 @@
<logger name="org.apache.http" level="INFO"/>
<logger name="org.apache.skywalking.oap.server.core.alarm.AlarmStandardPersistence" level="DEBUG"/>
<logger name="org.apache.skywalking.oap.server.core" level="INFO"/>
+ <logger name="org.apache.skywalking.oap.server.core.analysis.worker" level="DEBUG" />
<logger name="org.apache.skywalking.oap.server.core.remote.client" level="DEBUG"/>
<logger name="org.apache.skywalking.oap.server.library.buffer" level="INFO"/>
- <logger name="org.apache.skywalking.oap.server.receiver.so11y" level="DEBUG"/>
<Root level="DEBUG">
<AppenderRef ref="Console"/>
</Root>
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
index 388e2d7..2647bc7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.analysis.meter;
import lombok.EqualsAndHashCode;
+import lombok.Getter;
import lombok.ToString;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
@@ -28,6 +29,7 @@ import org.apache.skywalking.oap.server.core.analysis.IDManager;
*/
@EqualsAndHashCode
@ToString
+@Getter
public class MeterEntity {
private ScopeType scopeType;
private String serviceName;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
index 9bddeac..ccd3e98 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
@@ -25,6 +25,7 @@ import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import javassist.CannotCompileException;
import javassist.ClassPool;
import javassist.CtClass;
@@ -103,6 +104,42 @@ public class MeterSystem implements Service {
* @throws UnexpectedException if binary code manipulation fails or stream core failure.
*/
public synchronized <T> boolean create(String metricsName,
+ String functionName,
+ ScopeType type) throws IllegalArgumentException {
+ final Class<? extends MeterFunction> meterFunction = functionRegister.get(functionName);
+
+ if (meterFunction == null) {
+ throw new IllegalArgumentException("Function " + functionName + " can't be found.");
+ }
+ Type acceptance = null;
+ for (final Type genericInterface : meterFunction.getGenericInterfaces()) {
+ if (genericInterface instanceof ParameterizedType) {
+ ParameterizedType parameterizedType = (ParameterizedType) genericInterface;
+ if (parameterizedType.getRawType().getTypeName().equals(AcceptableValue.class.getName())) {
+ Type[] arguments = parameterizedType.getActualTypeArguments();
+ acceptance = arguments[0];
+ break;
+ }
+ }
+ }
+ try {
+ return create(metricsName, functionName, type, Class.forName(Objects.requireNonNull(acceptance).getTypeName()));
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * Create streaming calculation of the given metrics name. This methods is synchronized due to heavy implementation
+ * including creating dynamic class. Don't use this in concurrency runtime.
+ *
+ * @param metricsName The name used as the storage eneity and in the query stage.
+ * @param functionName The function provided through {@link MeterFunction}.
+ * @return true if created, false if it exists.
+ * @throws IllegalArgumentException if the parameter can't match the expectation.
+ * @throws UnexpectedException if binary code manipulation fails or stream core failure.
+ */
+ public synchronized <T> boolean create(String metricsName,
String functionName,
ScopeType type,
Class<T> dataType) throws IllegalArgumentException {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgFunction.java
index 424cbce..92fae1e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgFunction.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgFunction.java
@@ -20,9 +20,10 @@ package org.apache.skywalking.oap.server.core.analysis.meter.function;
import java.util.HashMap;
import java.util.Map;
-import lombok.EqualsAndHashCode;
+import java.util.Objects;
import lombok.Getter;
import lombok.Setter;
+import lombok.ToString;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
@@ -34,10 +35,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
@MeterFunction(functionName = "avg")
-@EqualsAndHashCode(of = {
- "entityId",
- "timeBucket"
-})
+@ToString
public abstract class AvgFunction extends LongAvgMetrics implements AcceptableValue<Long> {
@Setter
@Getter
@@ -149,4 +147,20 @@ public abstract class AvgFunction extends LongAvgMetrics implements AcceptableVa
return map;
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (!(o instanceof AvgFunction))
+ return false;
+ AvgFunction function = (AvgFunction) o;
+ return Objects.equals(entityId, function.entityId) &&
+ getTimeBucket() == function.getTimeBucket();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(entityId, getTimeBucket());
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgHistogramFunction.java
similarity index 55%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgHistogramFunction.java
index f1150a6..1d7b343 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgHistogramFunction.java
@@ -18,9 +18,11 @@
package org.apache.skywalking.oap.server.core.analysis.meter.function;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import lombok.EqualsAndHashCode;
+import java.util.Objects;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@@ -30,24 +32,28 @@ import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.query.type.Bucket;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
- * Histogram includes data range buckets and the amount matched/grouped in the buckets. This is for original histogram
- * graph visualization
+ * AvgHistogram intends to aggregate raw values over the interval (minute, hour or day). When users query a value
+ * from such a interval, an average over it will be sent back.
+ *
+ * The acceptable bucket value should be a result from one of "increase", "rate" and "irate" query functions.
+ * That means the value is the increase or per-second instant rate of increase in a specific range.
+ *
+ * Example:
+ * "persistence_timer_bulk_execute_latency" is histogram, the possible PromQL format of acceptable bucket value should be:
+ * "increase(persistence_timer_bulk_execute_latency{service="oap-server", instance="localhost:1234"}[5m])"
*/
-@MeterFunction(functionName = "histogram")
+@MeterFunction(functionName = "avgHistogram")
@Slf4j
-@EqualsAndHashCode(of = {
- "entityId",
- "timeBucket"
-})
@ToString
-public abstract class HistogramFunction extends Metrics implements AcceptableValue<BucketedValues> {
+public abstract class AvgHistogramFunction extends Metrics implements AcceptableValue<BucketedValues> {
public static final String DATASET = "dataset";
+ protected static final String SUMMATION = "summation";
+ protected static final String COUNT = "count";
@Setter
@Getter
@@ -55,6 +61,14 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
private String entityId;
@Getter
@Setter
+ @Column(columnName = SUMMATION, storageOnly = true)
+ protected DataTable summation = new DataTable(30);
+ @Getter
+ @Setter
+ @Column(columnName = COUNT, storageOnly = true)
+ protected DataTable count = new DataTable(30);
+ @Getter
+ @Setter
@Column(columnName = DATASET, dataType = Column.ValueDataType.HISTOGRAM, storageOnly = true, defaultValue = 0)
private DataTable dataset = new DataTable(30);
@@ -71,46 +85,51 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
final long[] values = value.getValues();
for (int i = 0; i < values.length; i++) {
- final long bucket = value.getBuckets()[i];
- String bucketName = bucket == Integer.MIN_VALUE ? Bucket.INFINITE_NEGATIVE : String.valueOf(bucket);
- final long bucketValue = values[i];
- dataset.valueAccumulation(bucketName, bucketValue);
+ String bucketName = String.valueOf(value.getBuckets()[i]);
+ summation.valueAccumulation(bucketName, values[i]);
+ count.valueAccumulation(bucketName, 1L);
}
}
@Override
public void combine(final Metrics metrics) {
- HistogramFunction histogram = (HistogramFunction) metrics;
+ AvgHistogramFunction histogram = (AvgHistogramFunction) metrics;
- if (!dataset.keysEqual(histogram.getDataset())) {
+ if (!summation.keysEqual(histogram.getSummation())) {
log.warn("Incompatible input [{}}] for current HistogramFunction[{}], entity {}",
histogram, this, entityId
);
return;
}
- this.dataset.append(histogram.dataset);
+ this.summation.append(histogram.summation);
+ this.count.append(histogram.count);
}
@Override
public void calculate() {
-
+ final List<String> sortedKeys = summation.sortedKeys(Comparator.comparingInt(Integer::parseInt));
+ for (String key : sortedKeys) {
+ dataset.put(key, summation.get(key) / count.get(key));
+ }
}
@Override
public Metrics toHour() {
- HistogramFunction metrics = (HistogramFunction) createNew();
+ AvgHistogramFunction metrics = (AvgHistogramFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInHour());
- metrics.setDataset(getDataset());
+ metrics.setCount(getCount());
+ metrics.setSummation(getSummation());
return metrics;
}
@Override
public Metrics toDay() {
- HistogramFunction metrics = (HistogramFunction) createNew();
+ AvgHistogramFunction metrics = (AvgHistogramFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInDay());
- metrics.setDataset(getDataset());
+ metrics.setCount(getCount());
+ metrics.setSummation(getSummation());
return metrics;
}
@@ -125,7 +144,8 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
this.setEntityId(remoteData.getDataStrings(0));
- this.setDataset(new DataTable(remoteData.getDataObjectStrings(0)));
+ this.setCount(new DataTable(remoteData.getDataObjectStrings(0)));
+ this.setSummation(new DataTable(remoteData.getDataObjectStrings(1)));
}
@Override
@@ -135,7 +155,8 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
remoteBuilder.addDataStrings(entityId);
- remoteBuilder.addDataObjectStrings(dataset.toStorageData());
+ remoteBuilder.addDataObjectStrings(count.toStorageData());
+ remoteBuilder.addDataObjectStrings(summation.toStorageData());
return remoteBuilder;
}
@@ -146,33 +167,53 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
}
@Override
- public Class<? extends StorageBuilder> builder() {
- return HistogramFunctionBuilder.class;
+ public Class<? extends AvgHistogramFunctionBuilder> builder() {
+ return AvgHistogramFunctionBuilder.class;
}
- public static class HistogramFunctionBuilder implements StorageBuilder<HistogramFunction> {
+ public static class AvgHistogramFunctionBuilder implements StorageBuilder<AvgHistogramFunction> {
@Override
- public HistogramFunction map2Data(final Map<String, Object> dbMap) {
- HistogramFunction metrics = new HistogramFunction() {
+ public AvgHistogramFunction map2Data(final Map<String, Object> dbMap) {
+ AvgHistogramFunction metrics = new AvgHistogramFunction() {
@Override
public AcceptableValue<BucketedValues> createNew() {
throw new UnexpectedException("createNew should not be called");
}
};
metrics.setDataset(new DataTable((String) dbMap.get(DATASET)));
+ metrics.setCount(new DataTable((String) dbMap.get(COUNT)));
+ metrics.setSummation(new DataTable((String) dbMap.get(SUMMATION)));
metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
metrics.setEntityId((String) dbMap.get(ENTITY_ID));
return metrics;
}
@Override
- public Map<String, Object> data2Map(final HistogramFunction storageData) {
+ public Map<String, Object> data2Map(final AvgHistogramFunction storageData) {
Map<String, Object> map = new HashMap<>();
map.put(DATASET, storageData.getDataset());
+ map.put(COUNT, storageData.getCount());
+ map.put(SUMMATION, storageData.getSummation());
map.put(TIME_BUCKET, storageData.getTimeBucket());
map.put(ENTITY_ID, storageData.getEntityId());
return map;
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (!(o instanceof AvgHistogramFunction))
+ return false;
+ AvgHistogramFunction function = (AvgHistogramFunction) o;
+ return Objects.equals(entityId, function.entityId) &&
+ getTimeBucket() == function.getTimeBucket();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(entityId, getTimeBucket());
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgHistogramPercentileFunction.java
similarity index 67%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgHistogramPercentileFunction.java
index 01d767b..d88e46e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgHistogramPercentileFunction.java
@@ -22,8 +22,8 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.stream.IntStream;
-import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
@@ -35,26 +35,29 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.MultiIntValuesHolder;
-import org.apache.skywalking.oap.server.core.analysis.metrics.PercentileMetrics;
-import org.apache.skywalking.oap.server.core.query.type.Bucket;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
- * PercentileFunction is the implementation of {@link PercentileMetrics} in the meter system. The major difference is
- * the PercentileFunction accepts the {@link PercentileArgument} as input rather than every single request.
+ * AvgPercentile intends to calculate percentile based on the average of raw values over the interval(minute, hour or day).
+ *
+ * The acceptable bucket value should be a result from one of "increase", "rate" and "irate" query functions.
+ * That means the value in a bucket is the increase or per-second instant rate of increase in a specific range.
+ * Then AvgPercentileFunction calculates percentile based on the above buckets.
+ *
+ * Example:
+ * "persistence_timer_bulk_execute_latency" is histogram, the possible PromQL format of acceptable bucket value should be:
+ * "increase(persistence_timer_bulk_execute_latency{service="oap-server", instance="localhost:1234"}[5m])"
*/
-@MeterFunction(functionName = "percentile")
+@MeterFunction(functionName = "avgHistogramPercentile")
@Slf4j
-@EqualsAndHashCode(of = {
- "entityId",
- "timeBucket"
-})
-public abstract class PercentileFunction extends Metrics implements AcceptableValue<PercentileFunction.PercentileArgument>, MultiIntValuesHolder {
+public abstract class AvgHistogramPercentileFunction extends Metrics implements AcceptableValue<AvgHistogramPercentileFunction.AvgPercentileArgument>, MultiIntValuesHolder {
public static final String DATASET = "dataset";
public static final String RANKS = "ranks";
public static final String VALUE = "value";
+ protected static final String SUMMATION = "summation";
+ protected static final String COUNT = "count";
@Setter
@Getter
@@ -66,6 +69,14 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
private DataTable percentileValues = new DataTable(10);
@Getter
@Setter
+ @Column(columnName = SUMMATION, storageOnly = true)
+ protected DataTable summation = new DataTable(30);
+ @Getter
+ @Setter
+ @Column(columnName = COUNT, storageOnly = true)
+ protected DataTable count = new DataTable(30);
+ @Getter
+ @Setter
@Column(columnName = DATASET, storageOnly = true)
private DataTable dataset = new DataTable(30);
/**
@@ -79,7 +90,7 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
private boolean isCalculated = false;
@Override
- public void accept(final MeterEntity entity, final PercentileArgument value) {
+ public void accept(final MeterEntity entity, final AvgPercentileArgument value) {
if (dataset.size() > 0) {
if (!value.getBucketedValues().isCompatible(dataset)) {
throw new IllegalArgumentException(
@@ -116,10 +127,9 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
final long[] values = value.getBucketedValues().getValues();
for (int i = 0; i < values.length; i++) {
- final long bucket = value.getBucketedValues().getBuckets()[i];
- String bucketName = bucket == Integer.MIN_VALUE ? Bucket.INFINITE_NEGATIVE : String.valueOf(bucket);
- final long bucketValue = values[i];
- dataset.valueAccumulation(bucketName, bucketValue);
+ String bucketName = String.valueOf(value.getBucketedValues().getBuckets()[i]);
+ summation.valueAccumulation(bucketName, values[i]);
+ count.valueAccumulation(bucketName, 1L);
}
this.isCalculated = false;
@@ -127,16 +137,15 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
@Override
public void combine(final Metrics metrics) {
- PercentileFunction percentile = (PercentileFunction) metrics;
+ AvgHistogramPercentileFunction percentile = (AvgHistogramPercentileFunction) metrics;
- if (!dataset.keysEqual(percentile.getDataset())) {
- log.warn("Incompatible input [{}}] for current HistogramFunction[{}], entity {}",
+ if (!summation.keysEqual(percentile.getSummation())) {
+ log.warn("Incompatible input [{}}] for current PercentileFunction[{}], entity {}",
percentile, this, entityId
);
return;
}
if (ranks.size() > 0) {
- IntList ranksOfThat = percentile.getRanks();
if (this.ranks.size() != ranks.size()) {
log.warn("Incompatible ranks size = [{}}] for current PercentileFunction[{}]",
ranks.size(), this.ranks.size()
@@ -150,7 +159,8 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
}
}
- this.dataset.append(percentile.dataset);
+ this.summation.append(percentile.summation);
+ this.count.append(percentile.count);
this.isCalculated = false;
}
@@ -158,6 +168,11 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
@Override
public void calculate() {
if (!isCalculated) {
+ final List<String> sortedKeys = summation.sortedKeys(Comparator.comparingInt(Integer::parseInt));
+ for (String key : sortedKeys) {
+ dataset.put(key, summation.get(key) / count.get(key));
+ }
+
long total = dataset.sumOfValues();
int[] roofs = new int[ranks.size()];
@@ -166,8 +181,6 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
}
int count = 0;
- final List<String> sortedKeys = dataset.sortedKeys(Comparator.comparingInt(Integer::parseInt));
-
int loopIndex = 0;
for (String key : sortedKeys) {
@@ -190,10 +203,11 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
@Override
public Metrics toHour() {
- PercentileFunction metrics = (PercentileFunction) createNew();
+ AvgHistogramPercentileFunction metrics = (AvgHistogramPercentileFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInHour());
- metrics.setDataset(getDataset());
+ metrics.setSummation(getSummation());
+ metrics.setCount(getCount());
metrics.setRanks(getRanks());
metrics.setPercentileValues(getPercentileValues());
return metrics;
@@ -201,10 +215,11 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
@Override
public Metrics toDay() {
- PercentileFunction metrics = (PercentileFunction) createNew();
+ AvgHistogramPercentileFunction metrics = (AvgHistogramPercentileFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInDay());
- metrics.setDataset(getDataset());
+ metrics.setSummation(getSummation());
+ metrics.setCount(getCount());
metrics.setRanks(getRanks());
metrics.setPercentileValues(getPercentileValues());
return metrics;
@@ -229,9 +244,10 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
this.setEntityId(remoteData.getDataStrings(0));
- this.setDataset(new DataTable(remoteData.getDataObjectStrings(0)));
- this.setRanks(new IntList(remoteData.getDataObjectStrings(1)));
- this.setPercentileValues(new DataTable(remoteData.getDataObjectStrings(2)));
+ this.setSummation(new DataTable(remoteData.getDataObjectStrings(0)));
+ this.setCount(new DataTable(remoteData.getDataObjectStrings(1)));
+ this.setRanks(new IntList(remoteData.getDataObjectStrings(2)));
+ this.setPercentileValues(new DataTable(remoteData.getDataObjectStrings(3)));
}
@Override
@@ -241,7 +257,8 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
remoteBuilder.addDataStrings(entityId);
- remoteBuilder.addDataObjectStrings(dataset.toStorageData());
+ remoteBuilder.addDataObjectStrings(summation.toStorageData());
+ remoteBuilder.addDataObjectStrings(count.toStorageData());
remoteBuilder.addDataObjectStrings(ranks.toStorageData());
remoteBuilder.addDataObjectStrings(percentileValues.toStorageData());
@@ -254,28 +271,30 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
}
@Override
- public Class<? extends StorageBuilder> builder() {
- return PercentileFunctionBuilder.class;
+ public Class<? extends AvgPercentileFunctionBuilder> builder() {
+ return AvgPercentileFunctionBuilder.class;
}
@RequiredArgsConstructor
@Getter
- public static class PercentileArgument {
+ public static class AvgPercentileArgument {
private final BucketedValues bucketedValues;
private final int[] ranks;
}
- public static class PercentileFunctionBuilder implements StorageBuilder<PercentileFunction> {
+ public static class AvgPercentileFunctionBuilder implements StorageBuilder<AvgHistogramPercentileFunction> {
@Override
- public PercentileFunction map2Data(final Map<String, Object> dbMap) {
- PercentileFunction metrics = new PercentileFunction() {
+ public AvgHistogramPercentileFunction map2Data(final Map<String, Object> dbMap) {
+ AvgHistogramPercentileFunction metrics = new AvgHistogramPercentileFunction() {
@Override
- public AcceptableValue<PercentileArgument> createNew() {
+ public AcceptableValue<AvgPercentileArgument> createNew() {
throw new UnexpectedException("createNew should not be called");
}
};
metrics.setDataset(new DataTable((String) dbMap.get(DATASET)));
+ metrics.setSummation(new DataTable((String) dbMap.get(SUMMATION)));
+ metrics.setCount(new DataTable((String) dbMap.get(COUNT)));
metrics.setRanks(new IntList((String) dbMap.get(RANKS)));
metrics.setPercentileValues(new DataTable((String) dbMap.get(VALUE)));
metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
@@ -284,8 +303,10 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
}
@Override
- public Map<String, Object> data2Map(final PercentileFunction storageData) {
+ public Map<String, Object> data2Map(final AvgHistogramPercentileFunction storageData) {
Map<String, Object> map = new HashMap<>();
+ map.put(SUMMATION, storageData.getSummation());
+ map.put(COUNT, storageData.getCount());
map.put(DATASET, storageData.getDataset());
map.put(RANKS, storageData.getRanks());
map.put(VALUE, storageData.getPercentileValues());
@@ -294,4 +315,20 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
return map;
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (!(o instanceof AvgHistogramPercentileFunction))
+ return false;
+ AvgHistogramPercentileFunction function = (AvgHistogramPercentileFunction) o;
+ return Objects.equals(entityId, function.entityId) &&
+ getTimeBucket() == function.getTimeBucket();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(entityId, getTimeBucket());
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
index f1150a6..92cb798 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
@@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.meter.function;
import java.util.HashMap;
import java.util.Map;
-import lombok.EqualsAndHashCode;
+import java.util.Objects;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@@ -41,10 +41,6 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
*/
@MeterFunction(functionName = "histogram")
@Slf4j
-@EqualsAndHashCode(of = {
- "entityId",
- "timeBucket"
-})
@ToString
public abstract class HistogramFunction extends Metrics implements AcceptableValue<BucketedValues> {
public static final String DATASET = "dataset";
@@ -175,4 +171,20 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
return map;
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (!(o instanceof HistogramFunction))
+ return false;
+ HistogramFunction function = (HistogramFunction) o;
+ return Objects.equals(entityId, function.entityId) &&
+ getTimeBucket() == function.getTimeBucket();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(entityId, getTimeBucket());
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java
index 01d767b..5681c24 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java
@@ -22,8 +22,8 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.stream.IntStream;
-import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
@@ -47,10 +47,6 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
*/
@MeterFunction(functionName = "percentile")
@Slf4j
-@EqualsAndHashCode(of = {
- "entityId",
- "timeBucket"
-})
public abstract class PercentileFunction extends Metrics implements AcceptableValue<PercentileFunction.PercentileArgument>, MultiIntValuesHolder {
public static final String DATASET = "dataset";
public static final String RANKS = "ranks";
@@ -130,7 +126,7 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
PercentileFunction percentile = (PercentileFunction) metrics;
if (!dataset.keysEqual(percentile.getDataset())) {
- log.warn("Incompatible input [{}}] for current HistogramFunction[{}], entity {}",
+ log.warn("Incompatible input [{}}] for current PercentileFunction[{}], entity {}",
percentile, this, entityId
);
return;
@@ -294,4 +290,20 @@ public abstract class PercentileFunction extends Metrics implements AcceptableVa
return map;
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (!(o instanceof PercentileFunction))
+ return false;
+ PercentileFunction function = (PercentileFunction) o;
+ return Objects.equals(entityId, function.entityId) &&
+ getTimeBucket() == function.getTimeBucket();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(entityId, getTimeBucket());
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 9241f7a..e8fd957 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -134,8 +134,8 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
if (prepareRequests.size() > 0) {
log.debug(
- "prepare batch requests for model {}, took time: {}", model.getName(),
- System.currentTimeMillis() - start
+ "prepare batch requests for model {}, took time: {}, size: {}", model.getName(),
+ System.currentTimeMillis() - start, prepareRequests.size()
);
}
}
diff --git a/oap-server/server-fetcher-plugin/pom.xml b/oap-server/server-fetcher-plugin/pom.xml
index ea346f5..5ba2f09 100644
--- a/oap-server/server-fetcher-plugin/pom.xml
+++ b/oap-server/server-fetcher-plugin/pom.xml
@@ -33,4 +33,11 @@
<modules>
<module>prometheus-fetcher-plugin</module>
</modules>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.vavr</groupId>
+ <artifactId>vavr</artifactId>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/pom.xml b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/pom.xml
index 5a46450..7737ebd 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/pom.xml
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/pom.xml
@@ -35,5 +35,13 @@
<artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
index 0c004af..10e31ff 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
@@ -25,4 +25,6 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig;
public class PrometheusFetcherConfig extends ModuleConfig {
private boolean active;
+ private final String rulePath = "fetcher-prom-rules";
+
}
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
index 9c4a730..7e9d40a 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
@@ -18,26 +18,90 @@
package org.apache.skywalking.oap.server.fetcher.prometheus.provider;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import io.vavr.CheckedFunction1;
+import io.vavr.Function1;
+import io.vavr.Tuple;
+import io.vavr.Tuple3;
+import io.vavr.control.Try;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.StringJoiner;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.commons.lang3.Validate;
import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
-import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import org.apache.skywalking.oap.server.core.analysis.meter.function.AvgHistogramPercentileFunction;
import org.apache.skywalking.oap.server.core.analysis.meter.function.BucketedValues;
-import org.apache.skywalking.oap.server.core.analysis.meter.function.PercentileFunction;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.fetcher.prometheus.module.PrometheusFetcherModule;
+import org.apache.skywalking.oap.server.fetcher.prometheus.provider.counter.Window;
+import org.apache.skywalking.oap.server.fetcher.prometheus.provider.operation.Operation;
+import org.apache.skywalking.oap.server.fetcher.prometheus.provider.operation.MetricSource;
+import org.apache.skywalking.oap.server.fetcher.prometheus.provider.rule.Rule;
+import org.apache.skywalking.oap.server.fetcher.prometheus.provider.rule.Rules;
+import org.apache.skywalking.oap.server.fetcher.prometheus.provider.rule.StaticConfig;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.library.util.prometheus.Parser;
+import org.apache.skywalking.oap.server.library.util.prometheus.Parsers;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Counter;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Histogram;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.MetricFamily;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.MetricType;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Summary;
+import org.elasticsearch.common.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
public class PrometheusFetcherProvider extends ModuleProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(PrometheusFetcherProvider.class);
+
+ private final static BigDecimal SECOND_TO_MILLISECOND = BigDecimal.TEN.pow(3);
+
+ private final static String AVG_HISTOGRAM = "avgHistogram";
+
+ private final static String AVG_PERCENTILE = "avgHistogramPercentile";
+
+ private final static String AVG = "avg";
+
private final PrometheusFetcherConfig config;
+ private final OkHttpClient client = new OkHttpClient();
+
+ private List<Rule> rules;
+
+ private ScheduledExecutorService ses;
+
public PrometheusFetcherProvider() {
config = new PrometheusFetcherConfig();
}
@@ -59,97 +123,222 @@ public class PrometheusFetcherProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+ if (!config.isActive()) {
+ return;
+ }
+ rules = Rules.loadRules(config.getRulePath());
+ ses = Executors.newScheduledThreadPool(rules.size(), Executors.defaultThreadFactory());
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
-
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
- if (config.isActive()) {
- // TODO. This is only a demo about fetching the data and push into the calculation stream.
- final MeterSystem service = getManager().find(CoreModule.NAME).provider().getService(MeterSystem.class);
-
- service.create("test_long_metrics", "avg", ScopeType.SERVICE, Long.class);
- service.create("test_histogram_metrics", "histogram", ScopeType.SERVICE, BucketedValues.class);
- service.create(
- "test_percentile_metrics", "percentile", ScopeType.SERVICE,
- PercentileFunction.PercentileArgument.class
- );
-
- Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- final MeterEntity servEntity = MeterEntity.newService("mock_service");
-
- // Long Avg Example
- final AcceptableValue<Long> value = service.buildMetrics("test_long_metrics", Long.class);
- value.accept(servEntity, 5L);
- value.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
- service.doStreamingCalculation(value);
-
- // Histogram Example
- final AcceptableValue<BucketedValues> histogramMetrics = service.buildMetrics(
- "test_histogram_metrics", BucketedValues.class);
- value.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
- histogramMetrics.accept(servEntity, new BucketedValues(
- new int[] {
- Integer.MIN_VALUE,
- 0,
- 50,
- 100,
- 250
- },
- new long[] {
- 3,
- 1,
- 4,
- 10,
- 10
- }
- ));
- service.doStreamingCalculation(histogramMetrics);
-
- // Percentile Example
- final AcceptableValue<PercentileFunction.PercentileArgument> testPercentileMetrics = service.buildMetrics(
- "test_percentile_metrics", PercentileFunction.PercentileArgument.class);
- testPercentileMetrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
- testPercentileMetrics.accept(
- MeterEntity.newService("service-test"),
- new PercentileFunction.PercentileArgument(
- new BucketedValues(
- // Buckets
- new int[] {
- 0,
- 51,
- 100,
- 250
- },
- // Values
- new long[] {
- 10,
- 20,
- 30,
- 40
+ if (!config.isActive()) {
+ return;
+ }
+ final MeterSystem service = getManager().find(CoreModule.NAME).provider().getService(MeterSystem.class);
+
+ rules.forEach(r -> {
+ r.getMetricsRules().forEach(rule -> {
+ service.create(formatMetricName(rule.getName()), rule.getOperation(), rule.getScope());
+ });
+ ses.scheduleAtFixedRate(new Runnable() {
+
+ private final Window window = new Window();
+
+ @Override public void run() {
+ if (Objects.isNull(r.getStaticConfig())) {
+ return;
+ }
+ long now = System.currentTimeMillis();
+ StaticConfig sc = r.getStaticConfig();
+ sc.getTargets().stream()
+ .map(CheckedFunction1.liftTry(url -> {
+ Request request = new Request.Builder()
+ .url(String.format("http://%s%s", url, r.getMetricsPath().startsWith("/") ? r.getMetricsPath() : "/" + r.getMetricsPath()))
+ .build();
+ List<Metric> result = new LinkedList<>();
+ try (Response response = client.newCall(request).execute()) {
+ Parser p = Parsers.text(requireNonNull(response.body()).byteStream());
+ MetricFamily mf;
+
+ while ((mf = p.parse()) != null) {
+ result.addAll(mf.getMetrics().stream()
+ .peek(metric -> {
+ Map<String, String> extraLabels = Maps.newHashMap(sc.getLabels());
+ extraLabels.put("instance", url);
+ extraLabels.forEach((key, value) -> {
+ if (metric.getLabels().containsKey(key)) {
+ metric.getLabels().put("exported_" + key, metric.getLabels().get(key));
+ }
+ metric.getLabels().put(key, value);
+ });
+ })
+ .collect(toList()));
+ if (mf.getType() == MetricType.HISTOGRAM) {
+ Histogram h = (Histogram) mf.getMetrics().get(0);
+ result.add(new Counter(h.getName() + "_count", h.getLabels(), h.getSampleCount()));
+ result.add(new Counter(h.getName() + "_sum", h.getLabels(), h.getSampleSum()));
+ }
+ if (mf.getType() == MetricType.SUMMARY) {
+ Summary s = (Summary) mf.getMetrics().get(0);
+ result.add(new Counter(s.getName() + "_count", s.getLabels(), s.getSampleCount()));
+ result.add(new Counter(s.getName() + "_sum", s.getLabels(), s.getSampleSum()));
+ }
}
- ),
- // Ranks
- new int[] {
- 50,
- 90
}
+ return result;
+ }))
+ .flatMap(tryIt -> PrometheusFetcherProvider.log(tryIt, "Load metric"))
+ .flatMap(Collection::stream)
+ .flatMap(metric ->
+ r.getMetricsRules().stream()
+ .flatMap(rule -> rule.getSources().entrySet().stream().map(source -> Tuple.of(rule, source.getKey(), source.getValue())))
+ .filter(rule -> rule._2.equals(metric.getName()))
+ .map(rule -> Tuple.of(rule._1, rule._2, rule._3, metric))
)
- );
- service.doStreamingCalculation(testPercentileMetrics);
+ .peek(tuple -> LOG.debug("Mapped rules to metrics: {}", tuple))
+ .map(Function1.liftTry(tuple -> {
+ String serviceName = composeEntity(tuple._3.getRelabel().getService().stream(), tuple._4.getLabels());
+ Operation o = new Operation(tuple._1.getOperation(), tuple._1.getName(), tuple._1.getScope(), tuple._1.getPercentiles());
+ MetricSource.MetricSourceBuilder sb = MetricSource.builder();
+ sb.promMetricName(tuple._2)
+ .scale(tuple._3.getScale())
+ .counterFunction(tuple._3.getCounterFunction())
+ .range(tuple._3.getRange());
+ switch (tuple._1.getScope()) {
+ case SERVICE:
+ return Tuple.of(o, sb.entity(MeterEntity.newService(serviceName)).build(), tuple._4);
+ case SERVICE_INSTANCE:
+ String instanceName = composeEntity(tuple._3.getRelabel().getInstance().stream(), tuple._4.getLabels());
+ return Tuple.of(o, sb.entity(MeterEntity.newServiceInstance(serviceName, instanceName)).build(), tuple._4);
+ case ENDPOINT:
+ String endpointName = composeEntity(tuple._3.getRelabel().getEndpoint().stream(), tuple._4.getLabels());
+ return Tuple.of(o, sb.entity(MeterEntity.newEndpoint(serviceName, endpointName)).build(), tuple._4);
+ default:
+ throw new IllegalArgumentException("Unsupported scope" + tuple._1.getScope());
+ }
+ }))
+ .flatMap(tryIt -> PrometheusFetcherProvider.log(tryIt, "Generated entity from labels"))
+ .collect(groupingBy(Tuple3::_1, groupingBy(Tuple3::_2, mapping(Tuple3::_3, toList()))))
+ .forEach((operation, sources) -> {
+ LOG.debug("Building metrics {} -> {}", operation, sources);
+ Try.run(() -> {
+ switch (operation.getName()) {
+ case AVG:
+ sources.forEach((source, metrics) -> {
+ AcceptableValue<Long> value = service.buildMetrics(formatMetricName(operation.getMetricName()), Long.class);
+ Double sumDouble = sum(metrics).value();
+ sumDouble = window.get(source.getPromMetricName()).apply(source, sumDouble);
+ value.accept(source.getEntity(), BigDecimal.valueOf(Double.isNaN(sumDouble) ? 0D : sumDouble)
+ .multiply(BigDecimal.TEN.pow(source.getScale())).longValue());
+ value.setTimeBucket(TimeBucket.getMinuteTimeBucket(now));
+ LOG.debug("Input metric {}", value.getTimeBucket());
+ service.doStreamingCalculation(value);
+
+ generateTraffic(source.getEntity());
+ });
+ break;
+ case AVG_HISTOGRAM:
+ case AVG_PERCENTILE:
+ Validate.isTrue(sources.size() == 1, "Can't get source for histogram");
+ Map.Entry<MetricSource, List<Metric>> smm = sources.entrySet().iterator().next();
+ Histogram h = (Histogram) sum(smm.getValue());
+
+ long[] vv = new long[h.getBuckets().size()];
+ int[] bb = new int[h.getBuckets().size()];
+ long v = 0L;
+ int i = 0;
+ for (Map.Entry<Double, Long> entry : h.getBuckets().entrySet()) {
+ long increase = entry.getValue() - v;
+ vv[i] = window.get(operation.getMetricName(), ImmutableMap.of("le", entry.getKey().toString()))
+ .apply(smm.getKey(), (double) increase).longValue();
+ v = entry.getValue();
+
+ if (i + 1 < h.getBuckets().size()) {
+ bb[i + 1] = BigDecimal.valueOf(entry.getKey()).multiply(SECOND_TO_MILLISECOND).intValue();
+ }
+
+ i++;
+ }
+
+ if (operation.getName().equals(AVG_HISTOGRAM)) {
+ AcceptableValue<BucketedValues> heatmapMetrics = service.buildMetrics(
+ formatMetricName(operation.getMetricName()), BucketedValues.class);
+ heatmapMetrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(now));
+ heatmapMetrics.accept(smm.getKey().getEntity(), new BucketedValues(bb, vv));
+ service.doStreamingCalculation(heatmapMetrics);
+ } else {
+ AcceptableValue<AvgHistogramPercentileFunction.AvgPercentileArgument> percentileMetrics =
+ service.buildMetrics(formatMetricName(operation.getMetricName()), AvgHistogramPercentileFunction.AvgPercentileArgument.class);
+ percentileMetrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(now));
+ percentileMetrics.accept(smm.getKey().getEntity(),
+ new AvgHistogramPercentileFunction.AvgPercentileArgument(new BucketedValues(bb, vv), operation.getPercentiles().stream().mapToInt(Integer::intValue).toArray()));
+ service.doStreamingCalculation(percentileMetrics);
+ }
+
+ generateTraffic(smm.getKey().getEntity());
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported downSampling %s", operation.getName()));
+ }
+ }).onFailure(e -> LOG.debug("Building metric failed", e));
+ });
}
- }, 2, 2, TimeUnit.SECONDS);
- }
+ }, 0L, Duration.parse(r.getFetcherInterval()).getSeconds(), TimeUnit.SECONDS);
+ });
}
@Override
public String[] requiredModules() {
return new String[] {CoreModule.NAME};
}
+
+ private String formatMetricName(String meterRuleName) {
+ StringJoiner metricName = new StringJoiner("_");
+ metricName.add("meter").add(meterRuleName);
+ return metricName.toString();
+ }
+
+ private String composeEntity(Stream<String> stream, Map<String, String> labels) {
+ return stream.map(key -> requireNonNull(labels.get(key), String.format("Getting %s from %s failed", key, labels)))
+ .collect(Collectors.joining("."));
+ }
+
+ private Metric sum(List<Metric> metrics) {
+ return metrics.stream().reduce(Metric::sum).orElseThrow(IllegalArgumentException::new);
+ }
+
+ private void generateTraffic(MeterEntity entity) {
+ ServiceTraffic s = new ServiceTraffic();
+ s.setName(requireNonNull(entity.getServiceName()));
+ s.setNodeType(NodeType.Normal);
+ s.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+ MetricsStreamProcessor.getInstance().in(s);
+ if (!Strings.isNullOrEmpty(entity.getInstanceName())) {
+ InstanceTraffic instanceTraffic = new InstanceTraffic();
+ instanceTraffic.setName(entity.getInstanceName());
+ instanceTraffic.setServiceId(entity.serviceId());
+ instanceTraffic.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+ instanceTraffic.setLastPingTimestamp(System.currentTimeMillis());
+ MetricsStreamProcessor.getInstance().in(instanceTraffic);
+ }
+ if (!Strings.isNullOrEmpty(entity.getEndpointName())) {
+ EndpointTraffic endpointTraffic = new EndpointTraffic();
+ endpointTraffic.setName(entity.getEndpointName());
+ endpointTraffic.setServiceId(entity.serviceId());
+ endpointTraffic.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+ MetricsStreamProcessor.getInstance().in(endpointTraffic);
+ }
+ }
+
+ private static <T> Stream<T> log(Try<T> t, String debugMessage) {
+ return t
+ .onSuccess(i -> LOG.debug(debugMessage + " :{}", i))
+ .onFailure(e -> LOG.debug(debugMessage + " failed", e))
+ .toJavaStream();
+ }
}
diff --git a/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yConfig.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/counter/ID.java
similarity index 64%
copy from oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yConfig.java
copy to oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/counter/ID.java
index 82271bd..c82debf 100644
--- a/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yConfig.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/counter/ID.java
@@ -16,23 +16,19 @@
*
*/
-package org.apache.skywalking.oap.server.telemetry.so11y;
+package org.apache.skywalking.oap.server.fetcher.prometheus.provider.counter;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import com.google.common.collect.ImmutableMap;
+import lombok.EqualsAndHashCode;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
-/**
- * The module configuration of self observability.
- */
-@Setter
-@Getter
-public class So11yConfig extends ModuleConfig {
-
- private boolean prometheusExporterEnabled = false;
-
- private String prometheusExporterHost = "0.0.0.0";
+@RequiredArgsConstructor
+@EqualsAndHashCode
+@ToString
+class ID {
- private int prometheusExporterPort = 1234;
+ private final String name;
+ private final ImmutableMap<String, String> labels;
}
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/counter/Window.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/counter/Window.java
new file mode 100644
index 0000000..35edb53
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/counter/Window.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.prometheus.provider.counter;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import io.vavr.Function2;
+import io.vavr.Tuple;
+import io.vavr.Tuple2;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import lombok.EqualsAndHashCode;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.fetcher.prometheus.provider.operation.MetricSource;
+
+/**
+ * Window stores a series of counter samples in order to calculate the increase
+ * or instant rate of increase.
+ */
+@RequiredArgsConstructor
+@ToString
+@EqualsAndHashCode
+public class Window {
+
+ private final Map<ID, Queue<Tuple2<Long, Double>>> windows = Maps.newHashMap();
+
+ public Function2<MetricSource, Double, Double> get(String name) {
+ return get(name, Collections.emptyMap());
+ }
+
+ public Function2<MetricSource, Double, Double> get(String name, Map<String, String> labels) {
+ ID id = new ID(name, ImmutableMap.copyOf(labels));
+ return (source, sum) -> operateCounter(id, source, sum);
+ }
+
+ private Double operateCounter(ID id, MetricSource source, Double sum) {
+ if (source.getCounterFunction() == null) {
+ return sum;
+ }
+ long now = System.currentTimeMillis();
+ switch (source.getCounterFunction()) {
+ case INCREASE:
+ Tuple2<Long, Double> i = increase(sum, id, Duration.parse(source.getRange()).toMillis());
+ return sum - i._2;
+ case RATE:
+ i = increase(sum, id, Duration.parse(source.getRange()).toMillis());
+ return (sum - i._2) / ((now - i._1) / 1000);
+ case IRATE:
+ i = increase(sum, id, 0);
+ return (sum - i._2) / ((now - i._1) / 1000);
+ default:
+ return sum;
+ }
+ }
+
+ private Tuple2<Long, Double> increase(Double value, ID id, long windowSize) {
+ if (!windows.containsKey(id)) {
+ windows.put(id, new LinkedList<>());
+ }
+ Queue<Tuple2<Long, Double>> window = windows.get(id);
+ long now = System.currentTimeMillis();
+ window.offer(Tuple.of(System.currentTimeMillis(), value));
+ Tuple2<Long, Double> ps = window.element();
+ if ((now - ps._1) >= windowSize) {
+ window.remove();
+ }
+ return ps;
+ }
+}
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/operation/MetricSource.java
similarity index 63%
copy from oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
copy to oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/operation/MetricSource.java
index 0c004af..de67005 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/operation/MetricSource.java
@@ -16,13 +16,27 @@
*
*/
-package org.apache.skywalking.oap.server.fetcher.prometheus.provider;
+package org.apache.skywalking.oap.server.fetcher.prometheus.provider.operation;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.fetcher.prometheus.provider.rule.CounterFunction;
+@EqualsAndHashCode
+@ToString
@Getter
-public class PrometheusFetcherConfig extends ModuleConfig {
- private boolean active;
+@Builder
+public class MetricSource {
+ private final String promMetricName;
+ private final MeterEntity entity;
+
+ private final CounterFunction counterFunction;
+
+ private final String range;
+
+ private final int scale;
}
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/operation/Operation.java
similarity index 67%
copy from oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
copy to oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/operation/Operation.java
index 0c004af..806255e 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/operation/Operation.java
@@ -16,13 +16,27 @@
*
*/
-package org.apache.skywalking.oap.server.fetcher.prometheus.provider;
+package org.apache.skywalking.oap.server.fetcher.prometheus.provider.operation;
+import java.util.List;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
+@EqualsAndHashCode
+@ToString
@Getter
-public class PrometheusFetcherConfig extends ModuleConfig {
- private boolean active;
+@RequiredArgsConstructor
+public class Operation {
+
+ private final String name;
+
+ private final String metricName;
+
+ private final ScopeType scope;
+
+ private final List<Integer> percentiles;
}
diff --git a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/so11y/So11yReceiverConfig.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/CounterFunction.java
similarity index 77%
copy from oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/so11y/So11yReceiverConfig.java
copy to oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/CounterFunction.java
index c72c14d..26c3059 100644
--- a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/so11y/So11yReceiverConfig.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/CounterFunction.java
@@ -16,12 +16,8 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.so11y;
+package org.apache.skywalking.oap.server.fetcher.prometheus.provider.rule;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-
-/**
- * Self observability receiver config.
- */
-public class So11yReceiverConfig extends ModuleConfig {
+public enum CounterFunction {
+ INCREASE, RATE, IRATE
}
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/MetricsRule.java
similarity index 68%
copy from oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
copy to oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/MetricsRule.java
index 0c004af..46cdc97 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/MetricsRule.java
@@ -16,13 +16,20 @@
*
*/
-package org.apache.skywalking.oap.server.fetcher.prometheus.provider;
+package org.apache.skywalking.oap.server.fetcher.prometheus.provider.rule;
-import lombok.Getter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-
-@Getter
-public class PrometheusFetcherConfig extends ModuleConfig {
- private boolean active;
+import java.util.List;
+import java.util.Map;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
+@Data
+@NoArgsConstructor
+public class MetricsRule {
+ private String name;
+ private ScopeType scope;
+ private String operation;
+ private List<Integer> percentiles;
+ private Map<String, PrometheusMetric> sources;
}
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/PrometheusMetric.java
similarity index 77%
copy from oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
copy to oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/PrometheusMetric.java
index 0c004af..622cb4b 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/PrometheusMetric.java
@@ -16,13 +16,16 @@
*
*/
-package org.apache.skywalking.oap.server.fetcher.prometheus.provider;
+package org.apache.skywalking.oap.server.fetcher.prometheus.provider.rule;
-import lombok.Getter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-
-@Getter
-public class PrometheusFetcherConfig extends ModuleConfig {
- private boolean active;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+@Data
+@NoArgsConstructor
+public class PrometheusMetric {
+ private CounterFunction counterFunction;
+ private String range;
+ private Relabel relabel;
+ private int scale = 0;
}
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/Relabel.java
similarity index 78%
copy from oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
copy to oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/Relabel.java
index 0c004af..95e3e50 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/Relabel.java
@@ -16,13 +16,16 @@
*
*/
-package org.apache.skywalking.oap.server.fetcher.prometheus.provider;
+package org.apache.skywalking.oap.server.fetcher.prometheus.provider.rule;
-import lombok.Getter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-
-@Getter
-public class PrometheusFetcherConfig extends ModuleConfig {
- private boolean active;
+import java.util.List;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+@Data
+@NoArgsConstructor
+public class Relabel {
+ private List<String> service;
+ private List<String> instance;
+ private List<String> endpoint;
}
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/Rule.java
similarity index 71%
copy from oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
copy to oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/Rule.java
index 0c004af..16ca2d8 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/Rule.java
@@ -16,13 +16,19 @@
*
*/
-package org.apache.skywalking.oap.server.fetcher.prometheus.provider;
+package org.apache.skywalking.oap.server.fetcher.prometheus.provider.rule;
-import lombok.Getter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-
-@Getter
-public class PrometheusFetcherConfig extends ModuleConfig {
- private boolean active;
+import java.util.List;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+@Data
+@NoArgsConstructor
+public class Rule {
+ private String name;
+ private String fetcherInterval;
+ private String fetcherTimeout;
+ private String metricsPath;
+ private StaticConfig staticConfig;
+ private List<MetricsRule> metricsRules;
}
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/Rules.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/Rules.java
new file mode 100644
index 0000000..e3c6e79
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/Rules.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.fetcher.prometheus.provider.rule;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.util.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+public class Rules {
+ private static final Logger LOG = LoggerFactory.getLogger(Rule.class);
+
+ public static List<Rule> loadRules(final String path) throws ModuleStartException {
+ File[] rules;
+ try {
+ rules = ResourceUtils.getPathFiles(path);
+ } catch (FileNotFoundException e) {
+ throw new ModuleStartException("Load fetcher rules failed", e);
+ }
+ return Arrays.stream(rules)
+ .map(f -> {
+ try (Reader r = new FileReader(f)) {
+ Rule rule = new Yaml().loadAs(r, Rule.class);
+ rule.setName(f.getName().replace(".", "_"));
+ return rule;
+ } catch (IOException e) {
+ LOG.debug("Reading file {} failed", f, e);
+ }
+ return null;
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+}
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/StaticConfig.java
similarity index 78%
copy from oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
copy to oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/StaticConfig.java
index 0c004af..6e0583b 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/StaticConfig.java
@@ -16,13 +16,16 @@
*
*/
-package org.apache.skywalking.oap.server.fetcher.prometheus.provider;
+package org.apache.skywalking.oap.server.fetcher.prometheus.provider.rule;
-import lombok.Getter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-
-@Getter
-public class PrometheusFetcherConfig extends ModuleConfig {
- private boolean active;
+import java.util.List;
+import java.util.Map;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+@Data
+@NoArgsConstructor
+public class StaticConfig {
+ private List<String> targets;
+ private Map<String, String> labels;
}
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/RulesTest.java
similarity index 65%
copy from oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
copy to oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/RulesTest.java
index 0c004af..4a9bcc3 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/rule/RulesTest.java
@@ -16,13 +16,22 @@
*
*/
-package org.apache.skywalking.oap.server.fetcher.prometheus.provider;
+package org.apache.skywalking.oap.server.fetcher.prometheus.provider.rule;
-import lombok.Getter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import java.util.List;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.junit.Test;
-@Getter
-public class PrometheusFetcherConfig extends ModuleConfig {
- private boolean active;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
-}
+public class RulesTest {
+
+ @Test
+ public void testFetcherPrometheusRulesLoader() throws ModuleStartException {
+ List<Rule> rr = Rules.loadRules("fetcher-prom-rules");
+
+ assertThat(rr.size(), is(1));
+ }
+
+}
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/test/resources/fetcher-prom-rules/localhost.yaml
similarity index 61%
rename from oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine
rename to oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/test/resources/fetcher-prom-rules/localhost.yaml
index e6616e5..9c38e0f 100644
--- a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/test/resources/fetcher-prom-rules/localhost.yaml
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -13,7 +12,27 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
-#
-org.apache.skywalking.oap.server.receiver.so11y.So11yReceiverModule
\ No newline at end of file
+fetcherInterval: PT2S
+fetcherTimeout: PT10S
+metricsPath: /metrics
+staticConfig:
+ # targets will be labeled as "instance"
+ targets:
+ - localhost:1234
+ labels:
+ app: test-oap
+metricsRules:
+ - name: instance_cpu_percentage
+ scope: SERVICE_INSTANCE
+ operation: avg
+ sources:
+ process_cpu_seconds_total:
+ counterFunction: RATE
+ range: PT1M
+ scale: 2
+ relabel:
+ service:
+ - app
+ instance:
+ - instance
diff --git a/oap-server/server-telemetry/telemetry-so11y/pom.xml b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/test/resources/log4j2.xml
similarity index 54%
rename from oap-server/server-telemetry/telemetry-so11y/pom.xml
rename to oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/test/resources/log4j2.xml
index 5005d9d..8ba63b3 100644
--- a/oap-server/server-telemetry/telemetry-so11y/pom.xml
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/test/resources/log4j2.xml
@@ -17,23 +17,16 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>server-telemetry</artifactId>
- <groupId>org.apache.skywalking</groupId>
- <version>8.0.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>telemetry-so11y</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>telemetry-prometheus</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
-
-
-</project>
\ No newline at end of file
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <logger name="org.apache.skywalking.oap.server.fetcher" level="DEBUG"/>
+ <Root level="INFO">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/oap-server/server-library/library-util/pom.xml b/oap-server/server-library/library-util/pom.xml
index a8d1162..752b461 100644
--- a/oap-server/server-library/library-util/pom.xml
+++ b/oap-server/server-library/library-util/pom.xml
@@ -64,5 +64,9 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-text</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/ResourceUtils.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/ResourceUtils.java
index ba7ea1b..a0fc0c2 100644
--- a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/ResourceUtils.java
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/ResourceUtils.java
@@ -18,20 +18,33 @@
package org.apache.skywalking.oap.server.library.util;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URL;
+import java.util.Objects;
public class ResourceUtils {
public static Reader read(String fileName) throws FileNotFoundException {
+ return new InputStreamReader(readToStream(fileName));
+ }
+
+ public static InputStream readToStream(String fileName) throws FileNotFoundException {
URL url = ResourceUtils.class.getClassLoader().getResource(fileName);
if (url == null) {
throw new FileNotFoundException("file not found: " + fileName);
}
- InputStream inputStream = ResourceUtils.class.getClassLoader().getResourceAsStream(fileName);
- return new InputStreamReader(inputStream);
+ return ResourceUtils.class.getClassLoader().getResourceAsStream(fileName);
+ }
+
+ public static File[] getPathFiles(String path) throws FileNotFoundException {
+ URL url = ResourceUtils.class.getClassLoader().getResource(path);
+ if (url == null) {
+ throw new FileNotFoundException("path not found: " + path);
+ }
+ return Objects.requireNonNull(new File(url.getPath()).listFiles(), "No files in " + path);
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/so11y/So11yReceiverConfig.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/Parser.java
similarity index 76%
copy from oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/so11y/So11yReceiverConfig.java
copy to oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/Parser.java
index c72c14d..5c36203 100644
--- a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/so11y/So11yReceiverConfig.java
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/Parser.java
@@ -16,12 +16,11 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.so11y;
+package org.apache.skywalking.oap.server.library.util.prometheus;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import java.io.IOException;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.MetricFamily;
-/**
- * Self observability receiver config.
- */
-public class So11yReceiverConfig extends ModuleConfig {
+public interface Parser {
+ MetricFamily parse() throws IOException;
}
diff --git a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/so11y/So11yReceiverModule.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/Parsers.java
similarity index 65%
rename from oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/so11y/So11yReceiverModule.java
rename to oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/Parsers.java
index 9a13b58..281e29c 100644
--- a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/so11y/So11yReceiverModule.java
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/Parsers.java
@@ -16,22 +16,13 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.so11y;
+package org.apache.skywalking.oap.server.library.util.prometheus;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import java.io.InputStream;
+import org.apache.skywalking.oap.server.library.util.prometheus.parser.TextParser;
-/**
- * Self observability receiver module.
- */
-public class So11yReceiverModule extends ModuleDefine {
- public static final String NAME = "receiver-so11y";
-
- public So11yReceiverModule() {
- super(NAME);
- }
-
- @Override
- public Class[] services() {
- return new Class[0];
+public class Parsers {
+ public static Parser text(final InputStream stream) {
+ return new TextParser(stream);
}
}
diff --git a/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yConfig.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/Counter.java
similarity index 56%
copy from oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yConfig.java
copy to oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/Counter.java
index 82271bd..b515319 100644
--- a/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yConfig.java
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/Counter.java
@@ -16,23 +16,33 @@
*
*/
-package org.apache.skywalking.oap.server.telemetry.so11y;
+package org.apache.skywalking.oap.server.library.util.prometheus.metrics;
+import java.util.Map;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
-import lombok.Setter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import lombok.Singular;
+import lombok.ToString;
-/**
- * The module configuration of self observability.
- */
-@Setter
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
@Getter
-public class So11yConfig extends ModuleConfig {
+public class Counter extends Metric {
- private boolean prometheusExporterEnabled = false;
+ private double value;
- private String prometheusExporterHost = "0.0.0.0";
+ @lombok.Builder
+ public Counter(String name, @Singular Map<String, String> labels, double value) {
+ super(name, labels);
+ this.value = value;
+ }
- private int prometheusExporterPort = 1234;
+ @Override public Metric sum(Metric m) {
+ this.value = this.value + m.value();
+ return this;
+ }
+ @Override public Double value() {
+ return this.value;
+ }
}
diff --git a/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yConfig.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/Gauge.java
similarity index 56%
copy from oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yConfig.java
copy to oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/Gauge.java
index 82271bd..1ee779b 100644
--- a/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yConfig.java
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/Gauge.java
@@ -16,23 +16,33 @@
*
*/
-package org.apache.skywalking.oap.server.telemetry.so11y;
+package org.apache.skywalking.oap.server.library.util.prometheus.metrics;
+import java.util.Map;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
-import lombok.Setter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import lombok.Singular;
+import lombok.ToString;
-/**
- * The module configuration of self observability.
- */
-@Setter
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
@Getter
-public class So11yConfig extends ModuleConfig {
+public class Gauge extends Metric {
- private boolean prometheusExporterEnabled = false;
+ private double value;
- private String prometheusExporterHost = "0.0.0.0";
+ @lombok.Builder
+ public Gauge(String name, @Singular Map<String, String> labels, double value) {
+ super(name, labels);
+ this.value = value;
+ }
- private int prometheusExporterPort = 1234;
+ @Override public Metric sum(Metric m) {
+ this.value = this.value + m.value();
+ return this;
+ }
+ @Override public Double value() {
+ return this.value;
+ }
}
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/Histogram.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/Histogram.java
new file mode 100644
index 0000000..3ffdcc1
--- /dev/null
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/Histogram.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util.prometheus.metrics;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Singular;
+import lombok.ToString;
+
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+@Getter
+public class Histogram extends Metric {
+
+ private long sampleCount;
+ private double sampleSum;
+ private Map<Double, Long> buckets;
+
+ @lombok.Builder
+ public Histogram(String name, @Singular Map<String, String> labels, long sampleCount, double sampleSum,
+ @Singular Map<Double, Long> buckets) {
+ super(name, labels);
+ getLabels().remove("le");
+ this.sampleCount = sampleCount;
+ this.sampleSum = sampleSum;
+ this.buckets = buckets;
+ }
+
+ @Override public Metric sum(Metric m) {
+ Histogram h = (Histogram) m;
+ this.buckets = Stream.concat(getBuckets().entrySet().stream(), h.getBuckets().entrySet().stream())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::sum, TreeMap::new));
+ this.sampleSum = this.sampleSum + h.sampleSum;
+ this.sampleCount = this.sampleCount + h.sampleCount;
+ return this;
+ }
+
+ @Override public Double value() {
+ return this.getSampleSum() * 1000 / this.getSampleCount();
+ }
+}
diff --git a/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yConfig.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/Metric.java
similarity index 60%
rename from oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yConfig.java
rename to oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/Metric.java
index 82271bd..35a6d74 100644
--- a/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yConfig.java
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/Metric.java
@@ -16,23 +16,28 @@
*
*/
-package org.apache.skywalking.oap.server.telemetry.so11y;
+package org.apache.skywalking.oap.server.library.util.prometheus.metrics;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
-import lombok.Setter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import lombok.ToString;
-/**
- * The module configuration of self observability.
- */
-@Setter
+@EqualsAndHashCode
+@ToString
@Getter
-public class So11yConfig extends ModuleConfig {
+public abstract class Metric {
- private boolean prometheusExporterEnabled = false;
+ private final String name;
+ private final Map<String, String> labels;
- private String prometheusExporterHost = "0.0.0.0";
+ protected Metric(String name, Map<String, String> labels) {
+ this.name = name;
+ this.labels = Maps.newHashMap(labels);
+ }
- private int prometheusExporterPort = 1234;
+ public abstract Metric sum(Metric m);
-}
+ public abstract Double value();
+}
\ No newline at end of file
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/MetricFamily.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/MetricFamily.java
new file mode 100644
index 0000000..cd901c2
--- /dev/null
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/MetricFamily.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util.prometheus.metrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Contains all metrics within a family (that is, of the same name). All metrics in a family have the same type.
+ */
+@EqualsAndHashCode
+@ToString
+public class MetricFamily {
+
+ public static class Builder {
+ private String name;
+ private String help;
+ private MetricType type;
+ private List<Metric> metrics;
+
+ public Builder setName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public Builder setHelp(String help) {
+ this.help = help;
+ return this;
+ }
+
+ public Builder setType(MetricType type) {
+ this.type = type;
+ return this;
+ }
+
+ public Builder addMetric(Metric metric) {
+ if (metrics == null) {
+ metrics = new ArrayList<>();
+ }
+ metrics.add(metric);
+ return this;
+ }
+
+ public MetricFamily build() {
+ return new MetricFamily(this);
+ }
+ }
+
+ private final String name;
+ private final String help;
+ private final MetricType type;
+ private final List<Metric> metrics;
+
+ protected MetricFamily(Builder builder) {
+ if (builder.name == null) {
+ throw new IllegalArgumentException("Need to set name");
+ }
+ if (builder.type == null) {
+ throw new IllegalArgumentException("Need to set type");
+ }
+
+ Class<? extends Metric> expectedMetricClassType;
+ switch (builder.type) {
+ case COUNTER:
+ expectedMetricClassType = Counter.class;
+ break;
+ case GAUGE:
+ expectedMetricClassType = Gauge.class;
+ break;
+ case SUMMARY:
+ expectedMetricClassType = Summary.class;
+ break;
+ case HISTOGRAM:
+ expectedMetricClassType = Histogram.class;
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid type: " + builder.type);
+ }
+
+ // make sure all the metrics in the family are of the expected type
+ if (builder.metrics != null && !builder.metrics.isEmpty()) {
+ for (Metric metric : builder.metrics) {
+ if (!expectedMetricClassType.isInstance(metric)) {
+ throw new IllegalArgumentException(
+ String.format("Metric type is [%s] so instances of class [%s] are expected, "
+ + "but got metric object of type [%s]",
+ builder.type, expectedMetricClassType.getName(), metric.getClass().getName()));
+ }
+ }
+
+ }
+
+ this.name = builder.name;
+ this.help = builder.help;
+ this.type = builder.type;
+ this.metrics = builder.metrics;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getHelp() {
+ return help;
+ }
+
+ public MetricType getType() {
+ return type;
+ }
+
+ public List<Metric> getMetrics() {
+ if (metrics == null) {
+ return Collections.emptyList();
+ }
+ return metrics;
+ }
+}
+
diff --git a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/so11y/So11yReceiverConfig.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/MetricType.java
similarity index 77%
rename from oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/so11y/So11yReceiverConfig.java
rename to oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/MetricType.java
index c72c14d..a056d5c 100644
--- a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/so11y/So11yReceiverConfig.java
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/MetricType.java
@@ -16,12 +16,8 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.so11y;
+package org.apache.skywalking.oap.server.library.util.prometheus.metrics;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-
-/**
- * Self observability receiver config.
- */
-public class So11yReceiverConfig extends ModuleConfig {
-}
+public enum MetricType {
+ COUNTER, GAUGE, SUMMARY, HISTOGRAM
+}
\ No newline at end of file
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/Summary.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/Summary.java
new file mode 100644
index 0000000..1088336
--- /dev/null
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/metrics/Summary.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util.prometheus.metrics;
+
+import java.util.Map;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Singular;
+import lombok.ToString;
+
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+@Getter
+public class Summary extends Metric {
+
+ private long sampleCount;
+ private double sampleSum;
+ private final Map<Double, Double> quantiles;
+
+ @lombok.Builder
+ public Summary(String name, @Singular Map<String, String> labels, long sampleCount, double sampleSum,
+ @Singular Map<Double, Double> quantiles) {
+ super(name, labels);
+ getLabels().remove("quantile");
+ this.sampleCount = sampleCount;
+ this.sampleSum = sampleSum;
+ this.quantiles = quantiles;
+ }
+
+ @Override public Metric sum(Metric m) {
+ Summary s = (Summary) m;
+ this.sampleCount = this.sampleCount + s.getSampleCount();
+ this.sampleSum = this.sampleSum + s.getSampleSum();
+ return null;
+ }
+
+ @Override public Double value() {
+ return this.getSampleSum() * 1000 / this.getSampleCount();
+ }
+}
\ No newline at end of file
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/Context.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/Context.java
new file mode 100644
index 0000000..01c3e9d
--- /dev/null
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/Context.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util.prometheus.parser;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Counter;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Gauge;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Histogram;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.MetricFamily;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.MetricType;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Summary;
+import org.apache.skywalking.oap.server.library.util.prometheus.parser.sample.TextSample;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Context {
+ private static final Logger LOG = LoggerFactory.getLogger(Context.class);
+ public MetricFamily metricFamily;
+
+ public String name = "";
+ public String help = "";
+ public MetricType type = null;
+ public List<String> allowedNames = new ArrayList<>();
+ public List<TextSample> samples = new ArrayList<>();
+
+ void addAllowedNames(String type) {
+ this.type = MetricType.valueOf(type.toUpperCase());
+ allowedNames.clear();
+ switch (this.type) {
+ case COUNTER:
+ case GAUGE:
+ allowedNames.add(name);
+ break;
+ case SUMMARY:
+ allowedNames.add(name + "_count");
+ allowedNames.add(name + "_sum");
+ allowedNames.add(name);
+ break;
+ case HISTOGRAM:
+ allowedNames.add(name + "_count");
+ allowedNames.add(name + "_sum");
+ allowedNames.add(name + "_bucket");
+ break;
+ }
+ }
+
+ void clear() {
+ name = "";
+ help = "";
+ type = null;
+ allowedNames.clear();
+ samples.clear();
+ }
+
+ void end() {
+ if (metricFamily != null) {
+ return;
+ }
+
+ MetricFamily.Builder metricFamilyBuilder = new MetricFamily.Builder();
+ metricFamilyBuilder.setName(name);
+ metricFamilyBuilder.setHelp(help);
+ metricFamilyBuilder.setType(type);
+
+ if (samples.size() < 1) {
+ return;
+ }
+ switch (type) {
+ case GAUGE:
+ samples.forEach(textSample -> metricFamilyBuilder
+ .addMetric(Gauge.builder()
+ .name(name)
+ .value(convertStringToDouble(textSample.getValue()))
+ .labels(textSample.getLabels())
+ .build()));
+ break;
+ case COUNTER:
+ samples.forEach(textSample -> metricFamilyBuilder
+ .addMetric(Counter.builder()
+ .name(name)
+ .value(convertStringToDouble(textSample.getValue()))
+ .labels(textSample.getLabels())
+ .build()));
+ break;
+ case HISTOGRAM:
+ Histogram.HistogramBuilder hBuilder = Histogram.builder();
+ hBuilder.name(name);
+ samples.forEach(textSample -> {
+ if (textSample.getName().endsWith("_count")) {
+ hBuilder.sampleCount((long) convertStringToDouble(textSample.getValue()));
+ } else if (textSample.getName().endsWith("_sum")) {
+ hBuilder.sampleSum(convertStringToDouble(textSample.getValue()));
+ } else if (textSample.getLabels().containsKey("le")) {
+ hBuilder.bucket(
+ convertStringToDouble(textSample.getLabels().remove("le")),
+ (long) convertStringToDouble(textSample.getValue())
+ );
+ }
+ });
+ metricFamilyBuilder.addMetric(hBuilder.build());
+ break;
+ case SUMMARY:
+ Summary.SummaryBuilder sBuilder = Summary.builder();
+ sBuilder.name(name);
+ samples.forEach(textSample -> {
+ if (textSample.getName().endsWith("_count")) {
+ sBuilder.sampleCount((long) convertStringToDouble(textSample.getValue()));
+ } else if (textSample.getName().endsWith("_sum")) {
+ sBuilder.sampleSum(convertStringToDouble(textSample.getValue()));
+ } else if (textSample.getLabels().containsKey("quantile")) {
+ sBuilder.quantile(
+ convertStringToDouble(textSample.getLabels().remove("quantile")),
+ convertStringToDouble(textSample.getValue())
+ );
+ }
+ });
+ metricFamilyBuilder.addMetric(sBuilder.build());
+ break;
+ }
+ metricFamily = metricFamilyBuilder.build();
+ }
+
+ private static double convertStringToDouble(String valueString) {
+ double doubleValue;
+ if (valueString.equalsIgnoreCase("NaN")) {
+ doubleValue = Double.NaN;
+ } else if (valueString.equalsIgnoreCase("+Inf")) {
+ doubleValue = Double.POSITIVE_INFINITY;
+ } else if (valueString.equalsIgnoreCase("-Inf")) {
+ doubleValue = Double.NEGATIVE_INFINITY;
+ } else {
+ doubleValue = Double.parseDouble(valueString);
+ }
+ return doubleValue;
+ }
+}
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/TextParser.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/TextParser.java
new file mode 100644
index 0000000..e891f7f
--- /dev/null
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/TextParser.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util.prometheus.parser;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import org.apache.commons.text.StringEscapeUtils;
+import org.apache.skywalking.oap.server.library.util.prometheus.Parser;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.MetricFamily;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.MetricType;
+import org.apache.skywalking.oap.server.library.util.prometheus.parser.sample.TextSample;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TextParser implements Parser {
+ private static final Logger LOG = LoggerFactory.getLogger(TextParser.class);
+
+ private final BufferedReader reader;
+
+ private String lastLineReadFromStream;
+
+ public TextParser(final InputStream inputStream) {
+ this.reader = new BufferedReader(new InputStreamReader(inputStream));
+ }
+
+ @Override
+ public MetricFamily parse() throws IOException {
+ String line;
+ if (lastLineReadFromStream != null) {
+ line = lastLineReadFromStream;
+ lastLineReadFromStream = null;
+ } else {
+ line = reader.readLine();
+ }
+ if (line == null) {
+ return null;
+ }
+
+ Context ctx = new Context();
+ while (line != null) {
+ line = line.trim();
+
+ try {
+ if (parseLine(line, ctx)) {
+ break;
+ }
+ } catch (Exception e) {
+ LOG.debug("Failed to process line - it will be ignored: {}", line, e);
+ }
+
+ line = reader.readLine();
+ }
+
+ if (!ctx.name.isEmpty()) {
+ ctx.end();
+ }
+
+ return ctx.metricFamily;
+ }
+
+ private boolean parseLine(String line, Context ctx) {
+ if (line.isEmpty()) {
+ return false;
+ }
+ if (line.charAt(0) == '#') {
+ String[] parts = line.split("[ \t]+", 4);
+ if (parts.length < 3) {
+ return false;
+ }
+ if (parts[1].equals("HELP")) {
+ if (!parts[2].equals(ctx.name)) {
+ if (!ctx.name.isEmpty()) {
+ this.lastLineReadFromStream = line;
+ return true;
+ }
+ ctx.clear();
+ ctx.name = parts[2];
+ ctx.type = MetricType.GAUGE;
+ ctx.allowedNames.add(parts[2]);
+ }
+ if (parts.length == 4) {
+ ctx.help = StringEscapeUtils.escapeJava(parts[3]);
+ }
+ } else if (parts[1].equals("TYPE")) {
+ if (!parts[2].equals(ctx.name)) {
+ if (!ctx.name.isEmpty()) {
+ this.lastLineReadFromStream = line;
+ return true;
+ }
+ ctx.clear();
+ ctx.name = parts[2];
+ }
+ ctx.addAllowedNames(parts[3]);
+ }
+ return false;
+ }
+ TextSample sample = TextSample.parse(line);
+ if (!ctx.allowedNames.contains(sample.getName())) {
+ if (!ctx.name.isEmpty()) {
+ this.lastLineReadFromStream = line;
+ return true;
+ }
+ ctx.clear();
+ LOG.debug("Ignoring an unexpected metric: {}", line);
+ } else {
+ ctx.samples.add(sample);
+ }
+ return false;
+ }
+}
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/sample/Context.java
similarity index 66%
copy from oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
copy to oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/sample/Context.java
index 0c004af..f77d1e1 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/sample/Context.java
@@ -16,13 +16,15 @@
*
*/
-package org.apache.skywalking.oap.server.fetcher.prometheus.provider;
+package org.apache.skywalking.oap.server.library.util.prometheus.parser.sample;
-import lombok.Getter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-
-@Getter
-public class PrometheusFetcherConfig extends ModuleConfig {
- private boolean active;
+import java.util.LinkedHashMap;
+import java.util.Map;
+class Context {
+ StringBuilder name = new StringBuilder();
+ StringBuilder labelname = new StringBuilder();
+ StringBuilder labelvalue = new StringBuilder();
+ StringBuilder value = new StringBuilder();
+ Map<String, String> labels = new LinkedHashMap<>();
}
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/sample/State.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/sample/State.java
new file mode 100644
index 0000000..2836d3e
--- /dev/null
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/sample/State.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util.prometheus.parser.sample;
+
+enum State {
+ NAME {
+ @Override
+ State nextState(char c, Context ctx) {
+ if (c == '{') {
+ return START_OF_LABEL_NAME;
+ } else if (isWhitespace(c)) {
+ return END_OF_NAME;
+ }
+ ctx.name.append(c);
+ return this;
+ }
+ },
+ END_OF_NAME {
+ @Override
+ State nextState(final char c, final Context ctx) {
+ if (isWhitespace(c)) {
+ return this;
+ } else if (c == '{') {
+ return START_OF_LABEL_NAME;
+ }
+ ctx.value.append(c);
+ return VALUE;
+ }
+ },
+ START_OF_LABEL_NAME {
+ @Override
+ State nextState(final char c, final Context ctx) {
+ if (isWhitespace(c)) {
+ return this;
+ } else if (c == '}') {
+ return END_OF_LABELS;
+ }
+ ctx.labelname.append(c);
+ return LABEL_NAME;
+ }
+ },
+ LABEL_NAME {
+ @Override
+ State nextState(final char c, final Context ctx) {
+ if (c == '=') {
+ return LABEL_VALUE_QUOTE;
+ } else if (c == '}') {
+ return END_OF_LABELS;
+ } else if (State.isWhitespace(c)) {
+ return LABEL_VALUE_EQUALS;
+ }
+ ctx.labelname.append(c);
+ return this;
+ }
+ },
+ LABEL_VALUE_EQUALS {
+ @Override
+ State nextState(final char c, final Context ctx) {
+ if (c == '=') {
+ return LABEL_VALUE_QUOTE;
+ } else if (State.isWhitespace(c)) {
+ return this;
+ }
+ return INVALID;
+ }
+ },
+ LABEL_VALUE_QUOTE {
+ @Override
+ State nextState(final char c, final Context ctx) {
+ if (c == '"') {
+ return LABEL_VALUE;
+ } else if (State.isWhitespace(c)) {
+ return this;
+ }
+ return INVALID;
+ }
+ },
+ LABEL_VALUE {
+ @Override
+ State nextState(final char c, final Context ctx) {
+ if (c == '\\') {
+ return LABEL_VALUE_SLASH;
+ } else if (c == '"') {
+ ctx.labels.put(ctx.labelname.toString(), ctx.labelvalue.toString());
+ ctx.labelname.setLength(0);
+ ctx.labelvalue.setLength(0);
+ return NEXT_LABEL;
+ }
+ ctx.labelvalue.append(c);
+ return this;
+ }
+ },
+ LABEL_VALUE_SLASH {
+ @Override
+ State nextState(final char c, final Context ctx) {
+ if (c == '\\') {
+ ctx.labelvalue.append('\\');
+ } else if (c == 'n') {
+ ctx.labelvalue.append('\n');
+ } else if (c == '"') {
+ ctx.labelvalue.append('"');
+ }
+ ctx.labelvalue.append('\\').append(c);
+ return LABEL_VALUE;
+ }
+ },
+ NEXT_LABEL {
+ @Override
+ State nextState(final char c, final Context ctx) {
+ if (c == ',') {
+ return LABEL_NAME;
+ } else if (c == '}') {
+ return END_OF_LABELS;
+ } else if (State.isWhitespace(c)) {
+ return this;
+ }
+ return INVALID;
+ }
+ },
+ END_OF_LABELS {
+ @Override
+ State nextState(final char c, final Context ctx) {
+ if (State.isWhitespace(c)) {
+ return this;
+ }
+ ctx.value.append(c);
+ return VALUE;
+ }
+ },
+ VALUE {
+ @Override
+ State nextState(final char c, final Context ctx) {
+ if (State.isWhitespace(c)) {
+ // TODO: timestamps
+ return END;
+ }
+ ctx.value.append(c);
+ return this;
+ }
+ },
+ END {
+ @Override
+ State nextState(final char c, final Context ctx) {
+ throw new IllegalStateException();
+ }
+ },
+ INVALID {
+ @Override
+ State nextState(final char c, final Context ctx) {
+ throw new IllegalStateException();
+ }
+ };
+
+ abstract State nextState(char c, Context ctx);
+
+ private static boolean isWhitespace(char c) {
+ return c == ' ' || c == '\t';
+ }
+}
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/sample/TextSample.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/sample/TextSample.java
new file mode 100644
index 0000000..53553b7
--- /dev/null
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/sample/TextSample.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util.prometheus.parser.sample;
+
+import java.util.Map;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
+public class TextSample {
+
+ private final String name;
+ private final Map<String, String> labels;
+ private final String value;
+ private final String line;
+
+ public static TextSample parse(String line) {
+ Context ctx = new Context();
+ State state = State.NAME;
+ for (int c = 0; c < line.length(); c++) {
+ char charAt = line.charAt(c);
+ state = state.nextState(charAt, ctx);
+ if (state == State.INVALID) {
+ throw new IllegalStateException(String.format("At offset %d, character is %c", c, charAt));
+ } else if (state == State.END) {
+ break;
+ }
+ }
+ return new TextSample(ctx.name.toString(), ctx.labels, ctx.value.toString(), line);
+ }
+}
\ No newline at end of file
diff --git a/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/TextParserTest.java b/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/TextParserTest.java
new file mode 100644
index 0000000..9554ada
--- /dev/null
+++ b/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/prometheus/parser/TextParserTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util.prometheus.parser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.Queue;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Counter;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Histogram;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.MetricFamily;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.MetricType;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Summary;
+import org.apache.skywalking.oap.server.library.util.ResourceUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+public class TextParserTest {
+
+ Queue<MetricFamily> expectedMfs = new LinkedList<>();
+
+ @Before
+ public void setup() {
+ expectedMfs.offer(new MetricFamily.Builder()
+ .setName("http_requests_total")
+ .setType(MetricType.COUNTER)
+ .setHelp("The total number of HTTP requests.")
+ .addMetric(Counter.builder()
+ .name("http_requests_total")
+ .label("method", "post")
+ .label("code", "200")
+ .value(1027D)
+ .build())
+ .addMetric(Counter.builder()
+ .name("http_requests_total")
+ .label("method", "post")
+ .label("code", "400")
+ .value(3D)
+ .build())
+ .build());
+ expectedMfs.offer(new MetricFamily.Builder()
+ .setName("http_request_duration_seconds")
+ .setType(MetricType.HISTOGRAM)
+ .setHelp("A histogram of the request duration.")
+ .addMetric(Histogram.builder()
+ .name("http_request_duration_seconds")
+ .sampleCount(144320L)
+ .sampleSum(53423.0D)
+ .bucket(0.05D, 24054L)
+ .bucket(0.1D, 33444L)
+ .bucket(0.2D, 100392L)
+ .bucket(0.5D, 129389L)
+ .bucket(1.0D, 133988L)
+ .bucket(Double.POSITIVE_INFINITY, 144320L)
+ .build())
+ .build());
+ expectedMfs.offer(new MetricFamily.Builder()
+ .setName("rpc_duration_seconds")
+ .setType(MetricType.SUMMARY)
+ .setHelp("A summary of the RPC duration in seconds.")
+ .addMetric(Summary.builder()
+ .name("rpc_duration_seconds")
+ .sampleCount(2693L)
+ .sampleSum(1.7560473E7D)
+ .quantile(0.01D, 3102D)
+ .quantile(0.05D, 3272D)
+ .quantile(0.5D, 4773D)
+ .quantile(0.9D, 9001D)
+ .quantile(0.99D, 76656D)
+ .build())
+ .build());
+ }
+
+ @Test
+ public void parseTextSuccessfully() throws IOException {
+ try (InputStream is = ResourceUtils.readToStream("testdata/prometheus.txt")) {
+ TextParser parser = new TextParser(is);
+ MetricFamily mf;
+ int mfNum = 0;
+ while ((mf = parser.parse()) != null) {
+ mfNum++;
+ MetricFamily expected = expectedMfs.poll();
+ assertNotNull(expected);
+ assertThat(mf, is(expected));
+ }
+ assertThat(mfNum , is(3));
+ }
+ }
+}
\ No newline at end of file
diff --git a/oap-server/server-library/library-util/src/test/resources/testdata/prometheus.txt b/oap-server/server-library/library-util/src/test/resources/testdata/prometheus.txt
new file mode 100644
index 0000000..b6eec13
--- /dev/null
+++ b/oap-server/server-library/library-util/src/test/resources/testdata/prometheus.txt
@@ -0,0 +1,36 @@
+# HELP http_requests_total The total number of HTTP requests.
+# TYPE http_requests_total counter
+http_requests_total{method="post",code="200"} 1027 1395066363000
+http_requests_total{method="post",code="400"} 3 1395066363000
+
+# Escaping in label values:
+msdos_file_access_time_seconds{path="C:\\DIR\\FILE.TXT",error="Cannot find file:\n\"FILE.TXT\""} 1.458255915e9
+
+# Minimalistic line:
+metric_without_timestamp_and_labels 12.47
+
+# A weird metric from before the epoch:
+something_weird{problem="division by zero"} +Inf -3982045
+
+# A histogram, which has a pretty complex representation in the text format:
+# HELP http_request_duration_seconds A histogram of the request duration.
+# TYPE http_request_duration_seconds histogram
+http_request_duration_seconds_bucket{le="0.05"} 24054
+http_request_duration_seconds_bucket{le="0.1"} 33444
+http_request_duration_seconds_bucket{le="0.2"} 100392
+http_request_duration_seconds_bucket{le="0.5"} 129389
+http_request_duration_seconds_bucket{le="1"} 133988
+http_request_duration_seconds_bucket{le="+Inf"} 144320
+http_request_duration_seconds_sum 53423
+http_request_duration_seconds_count 144320
+
+# Finally a summary, which has a complex representation, too:
+# HELP rpc_duration_seconds A summary of the RPC duration in seconds.
+# TYPE rpc_duration_seconds summary
+rpc_duration_seconds{quantile="0.01"} 3102
+rpc_duration_seconds{quantile="0.05"} 3272
+rpc_duration_seconds{quantile="0.5"} 4773
+rpc_duration_seconds{quantile="0.9"} 9001
+rpc_duration_seconds{quantile="0.99"} 76656
+rpc_duration_seconds_sum 1.7560473e+07
+rpc_duration_seconds_count 2693
diff --git a/oap-server/server-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/pom.xml
index 7d6acb8..d17b204 100644
--- a/oap-server/server-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/pom.xml
@@ -39,7 +39,6 @@
<module>skywalking-clr-receiver-plugin</module>
<module>jaeger-receiver-plugin</module>
<module>receiver-proto</module>
- <module>skywalking-so11y-receiver-plugin</module>
<module>skywalking-profile-receiver-plugin</module>
</modules>
diff --git a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/pom.xml
deleted file mode 100644
index a2cc4fa..0000000
--- a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/pom.xml
+++ /dev/null
@@ -1,39 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- ~
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>server-receiver-plugin</artifactId>
- <groupId>org.apache.skywalking</groupId>
- <version>8.0.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>skywalking-so11y-receiver-plugin</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>telemetry-api</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
-
-
-</project>
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/so11y/So11yReceiverModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/so11y/So11yReceiverModuleProvider.java
deleted file mode 100644
index 136ccc9..0000000
--- a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/so11y/So11yReceiverModuleProvider.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.receiver.so11y;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiConsumer;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-import org.apache.skywalking.apm.util.StringUtil;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.analysis.NodeType;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
-import org.apache.skywalking.oap.server.core.oal.rt.CoreOALDefine;
-import org.apache.skywalking.oap.server.core.source.GCPhrase;
-import org.apache.skywalking.oap.server.core.source.MemoryPoolType;
-import org.apache.skywalking.oap.server.core.source.ServiceInstanceJVMCPU;
-import org.apache.skywalking.oap.server.core.source.ServiceInstanceJVMGC;
-import org.apache.skywalking.oap.server.core.source.ServiceInstanceJVMMemory;
-import org.apache.skywalking.oap.server.core.source.ServiceInstanceJVMMemoryPool;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.MetricFamily;
-import org.apache.skywalking.oap.server.telemetry.api.MetricsCollector;
-import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.summingDouble;
-
-/**
- * Self observability receiver provider.
- */
-public class So11yReceiverModuleProvider extends ModuleProvider {
-
- private static final Logger logger = LoggerFactory.getLogger(So11yReceiverModuleProvider.class);
-
- private static final String SERVICE_NAME = "SkyWalking";
-
- private static final int RUN_RATE_SECONDS = 5;
-
- private final long[] lastNewGc = new long[] {
- 0L,
- 0L
- };
-
- private final long[] lastOldGc = new long[] {
- 0L,
- 0L
- };
-
- private String serviceId;
-
- private String serviceInstanceId;
-
- private String serviceInstanceName;
-
- private double lastCpuSeconds = -1;
-
- private SourceReceiver sourceReceiver;
-
- @Override
- public String name() {
- return "default";
- }
-
- @Override
- public Class<? extends ModuleDefine> module() {
- return So11yReceiverModule.class;
- }
-
- @Override
- public ModuleConfig createConfigBeanIfAbsent() {
- return new So11yReceiverConfig();
- }
-
- @Override
- public void prepare() throws ServiceNotProvidedException, ModuleStartException {
- }
-
- @Override
- public void start() throws ServiceNotProvidedException, ModuleStartException {
- // load official analysis
- getManager().find(CoreModule.NAME)
- .provider()
- .getService(OALEngineLoaderService.class)
- .load(CoreOALDefine.INSTANCE);
-
- sourceReceiver = getManager().find(CoreModule.NAME).provider().getService(SourceReceiver.class);
- MetricsCollector collector = getManager().find(TelemetryModule.NAME)
- .provider()
- .getService(MetricsCollector.class);
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("So11y-receiver-%s")
- .build()).scheduleAtFixedRate(() -> {
- register();
- Iterable<MetricFamily> mfs = collector.collect();
- Map<String, MetricFamily> metricsIndex = new HashMap<>();
- for (MetricFamily each : mfs) {
- if (each.samples.size() < 1) {
- continue;
- }
- metricsIndex.put(each.name, each);
- }
- writeCpuUsage(metricsIndex);
- writeJvmMemory(metricsIndex);
- writeJvmMemoryPool(metricsIndex);
- writeGC(metricsIndex);
- }, RUN_RATE_SECONDS, RUN_RATE_SECONDS, TimeUnit.SECONDS);
- }
-
- @Override
- public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
-
- }
-
- @Override
- public String[] requiredModules() {
- return new String[] {
- TelemetryModule.NAME,
- CoreModule.NAME
- };
- }
-
- private void writeGC(Map<String, MetricFamily> metricsIndex) {
- if (!metricsIndex.containsKey("jvm_gc_collection_seconds")) {
- return;
- }
- List<String> newGC = ImmutableList.of("PS Scavenge", "ParNew", "G1 Young Generation", "Copy");
- List<String> oldGC = ImmutableList.of(
- "PS MarkSweep", "ConcurrentMarkSweep", "G1 Old Generation", "MarkSweepCompact");
- metricsIndex.get("jvm_gc_collection_seconds").samples.stream().map(sample -> {
- int index = Iterables.indexOf(sample.labelNames, i -> Objects.equals(i, "gc"));
- if (index < 0) {
- return null;
- }
- String gcPhrase = sample.labelValues.get(index);
- GCMetricType type = sample.name.contains("sum") ? GCMetricType.SUM : GCMetricType.COUNT;
- double value = type == GCMetricType.SUM ? sample.value * 1000 : sample.value;
- if (newGC.contains(gcPhrase)) {
- return new GCMetric(GCPhrase.NEW, type, value);
- } else if (oldGC.contains(gcPhrase)) {
- return new GCMetric(GCPhrase.OLD, type, value);
- }
- throw new RuntimeException(String.format("Unsupported gc phrase %s", gcPhrase));
- }).filter(Objects::nonNull).collect(groupingBy(GCMetric::getPhrase)).forEach((gcPhrase, gcMetrics) -> {
- ServiceInstanceJVMGC gc = new ServiceInstanceJVMGC();
- gc.setId(serviceInstanceId);
- gc.setName(serviceInstanceName);
- gc.setServiceId(serviceId);
- gc.setServiceName(SERVICE_NAME);
- gc.setPhrase(gcPhrase);
- long[] lastGc = gcPhrase == GCPhrase.NEW ? lastNewGc : lastOldGc;
- gcMetrics.stream().filter(m -> m.type.equals(GCMetricType.COUNT)).findFirst().ifPresent(m -> {
- gc.setCount(m.getValue().longValue() - lastGc[0]);
- lastGc[0] = m.getValue().longValue();
- });
- gcMetrics.stream().filter(m -> m.type.equals(GCMetricType.SUM)).findFirst().ifPresent(m -> {
- gc.setTime(m.getValue().longValue() - lastGc[1]);
- lastGc[1] = m.getValue().longValue();
- });
- gc.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
- if (logger.isDebugEnabled()) {
- logger.debug("Write {} {}counts {}ms to {}", gc.getPhrase(), gc.getCount(), gc.getTime(), gc.getName());
- }
- sourceReceiver.receive(gc);
- });
- }
-
- private void writeJvmMemoryPool(Map<String, MetricFamily> metricsIndex) {
- List<MetricSetter<ServiceInstanceJVMMemoryPool>> setterList = ImmutableList.of(
- new MetricSetter<>("jvm_memory_pool_bytes_used", (m, v) -> m
- .setUsed(v.longValue())),
- new MetricSetter<>("jvm_memory_pool_bytes_committed", (m, v) -> m.setCommitted(v.longValue())),
- new MetricSetter<>("jvm_memory_pool_bytes_max", (m, v) -> m
- .setMax(v.longValue())),
- new MetricSetter<>("jvm_memory_pool_bytes_init", (m, v) -> m.setInit(v.longValue()))
- );
- if (setterList.stream().anyMatch(i -> !metricsIndex.containsKey(i.name))) {
- return;
- }
- Map<MemoryPoolType, ServiceInstanceJVMMemoryPool> poolMap = new HashMap<>();
- setterList.forEach(setter -> metricsIndex.get(setter.name).samples.stream()
- .map(sample -> {
- int index = Iterables.indexOf(
- sample.labelNames, i -> Objects
- .equals(i, "pool"));
- if (index < 0) {
- return null;
- }
- String poolType = sample.labelValues.get(
- index);
- if (poolType.contains("Code")) {
- return new PoolMetric(
- MemoryPoolType.CODE_CACHE_USAGE,
- sample.value
- );
- } else if (poolType.contains("Eden")) {
- return new PoolMetric(
- MemoryPoolType.NEWGEN_USAGE,
- sample.value
- );
- } else if (poolType.contains(
- "Survivor")) {
- return new PoolMetric(
- MemoryPoolType.SURVIVOR_USAGE,
- sample.value
- );
- } else if (poolType.contains("Old")) {
- return new PoolMetric(
- MemoryPoolType.OLDGEN_USAGE,
- sample.value
- );
- } else if (poolType.contains(
- "Metaspace")) {
- return new PoolMetric(
- MemoryPoolType.METASPACE_USAGE,
- sample.value
- );
- } else if (poolType.contains(
- "Perm") || poolType
- .contains("Compressed Class Space")) {
- return new PoolMetric(
- MemoryPoolType.PERMGEN_USAGE,
- sample.value
- );
- }
- throw new RuntimeException(
- String.format(
- "Unknown pool type %s",
- poolType
- ));
- })
- .filter(Objects::nonNull)
- .collect(groupingBy(
- PoolMetric::getType,
- summingDouble(
- PoolMetric::getValue)
- ))
- .forEach((memoryPoolType, value) -> {
- if (!poolMap.containsKey(
- memoryPoolType)) {
- ServiceInstanceJVMMemoryPool pool = new ServiceInstanceJVMMemoryPool();
- pool.setId(serviceInstanceId);
- pool.setName(serviceInstanceName);
- pool.setServiceId(serviceId);
- pool.setServiceName(SERVICE_NAME);
- pool.setPoolType(memoryPoolType);
- pool.setTimeBucket(
- TimeBucket.getMinuteTimeBucket(
- System
- .currentTimeMillis()));
- poolMap.put(memoryPoolType, pool);
- }
- ServiceInstanceJVMMemoryPool pool = poolMap
- .get(memoryPoolType);
- setter.delegated.accept(pool, value);
- }));
- poolMap.values().forEach(p -> {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Write {} {}-{}-{}-{} to {}", p.getPoolType(), humanReadableByteCount(p.getInit(), false),
- humanReadableByteCount(p
- .getUsed(), false), humanReadableByteCount(p.getCommitted(), false),
- humanReadableByteCount(p.getMax(), false), p
- .getName()
- );
- }
- sourceReceiver.receive(p);
- });
- }
-
- private void writeJvmMemory(final Map<String, MetricFamily> metricsIndex) {
- List<MetricSetter<ServiceInstanceJVMMemory>> setterList = ImmutableList.of(
- new MetricSetter<>("jvm_memory_bytes_used", (m, v) -> m
- .setUsed(v.longValue())),
- new MetricSetter<>("jvm_memory_bytes_committed", (m, v) -> m.setCommitted(v.longValue())),
- new MetricSetter<>("jvm_memory_bytes_max", (m, v) -> m
- .setMax(v.longValue())), new MetricSetter<>("jvm_memory_bytes_init", (m, v) -> m.setInit(v.longValue()))
- );
- if (setterList.stream().anyMatch(i -> !metricsIndex.containsKey(i.name))) {
- return;
- }
- ImmutableList.of(createJVMMemory(true), createJVMMemory(false)).forEach(memory -> {
- String area = memory.isHeapStatus() ? "heap" : "nonheap";
- setterList.forEach(setter -> {
- metricsIndex.get(setter.name).samples.stream().filter(input -> {
- int index = Iterables.indexOf(input.labelNames, i -> Objects.equals(i, "area"));
- if (index < 0) {
- return false;
- }
- return Objects.equals(input.labelValues.get(index), area);
- }).findFirst().ifPresent(sample -> setter.delegated.accept(memory, sample.value));
- });
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Write {} {}-{}-{}-{} to {}", area, humanReadableByteCount(memory.getInit(), false),
- humanReadableByteCount(memory
- .getUsed(), false), humanReadableByteCount(memory.getCommitted(), false),
- humanReadableByteCount(
- memory
- .getMax(), false), memory.getName()
- );
- }
- sourceReceiver.receive(memory);
- });
- }
-
- private ServiceInstanceJVMMemory createJVMMemory(boolean isHeap) {
- ServiceInstanceJVMMemory memory = new ServiceInstanceJVMMemory();
- memory.setId(serviceInstanceId);
- memory.setName(serviceInstanceName);
- memory.setServiceId(serviceId);
- memory.setServiceName(SERVICE_NAME);
- memory.setHeapStatus(isHeap);
- memory.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
- return memory;
- }
-
- private void writeCpuUsage(Map<String, MetricFamily> metricsIndex) {
- if (!metricsIndex.containsKey("process_cpu_seconds_total")) {
- return;
- }
- double value = metricsIndex.get("process_cpu_seconds_total").samples.get(0).value;
- if (lastCpuSeconds < 0) {
- lastCpuSeconds = value;
- return;
- }
- double percentage = (value - lastCpuSeconds) * 100 / (RUN_RATE_SECONDS * Runtime.getRuntime()
- .availableProcessors());
- lastCpuSeconds = value;
- ServiceInstanceJVMCPU serviceInstanceJVMCPU = new ServiceInstanceJVMCPU();
- serviceInstanceJVMCPU.setId(serviceInstanceId);
- serviceInstanceJVMCPU.setName(serviceInstanceName);
- serviceInstanceJVMCPU.setServiceId(serviceId);
- serviceInstanceJVMCPU.setServiceName(SERVICE_NAME);
- serviceInstanceJVMCPU.setUsePercent(percentage);
- serviceInstanceJVMCPU.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
- logger.debug("Write so11y cpu usage {} to {}", percentage, serviceInstanceName);
- sourceReceiver.receive(serviceInstanceJVMCPU);
- }
-
- private void register() {
- if (StringUtil.isEmpty(serviceInstanceId)) {
- logger.debug("Set up so11y service [{}].", SERVICE_NAME);
- serviceId = IDManager.ServiceID.buildId(SERVICE_NAME, NodeType.Normal);
- serviceInstanceId = IDManager.ServiceInstanceID.buildId(
- serviceId, TelemetryRelatedContext.INSTANCE.getId());
- }
- }
-
- @RequiredArgsConstructor
- private class MetricSetter<T> {
-
- final String name;
-
- final BiConsumer<T, Double> delegated;
-
- }
-
- private static String humanReadableByteCount(long bytes, boolean si) {
- int unit = si ? 1000 : 1024;
- if (bytes < unit)
- return bytes + " B";
- int exp = (int) (Math.log(bytes) / Math.log(unit));
- String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp - 1) + (si ? "" : "i");
- return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
- }
-
- @RequiredArgsConstructor
- @EqualsAndHashCode(of = "type")
- @ToString
- @Getter
- private class PoolMetric {
- private final MemoryPoolType type;
- private final Double value;
- }
-
- @RequiredArgsConstructor
- @EqualsAndHashCode(of = "phrase")
- @ToString
- @Getter
- private class GCMetric {
- private final GCPhrase phrase;
- private final GCMetricType type;
- private final Double value;
- }
-
- private enum GCMetricType {
- SUM, COUNT
- }
-
-}
diff --git a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
deleted file mode 100644
index 81ce655..0000000
--- a/oap-server/server-receiver-plugin/skywalking-so11y-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-
-org.apache.skywalking.oap.server.receiver.so11y.So11yReceiverModuleProvider
\ No newline at end of file
diff --git a/oap-server/server-telemetry/pom.xml b/oap-server/server-telemetry/pom.xml
index 6d796a1..89e07ec 100644
--- a/oap-server/server-telemetry/pom.xml
+++ b/oap-server/server-telemetry/pom.xml
@@ -30,6 +30,5 @@
<modules>
<module>telemetry-prometheus</module>
<module>telemetry-api</module>
- <module>telemetry-so11y</module>
</modules>
</project>
\ No newline at end of file
diff --git a/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yMetricsCollector.java b/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yMetricsCollector.java
deleted file mode 100644
index 4c78f3f..0000000
--- a/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yMetricsCollector.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.telemetry.so11y;
-
-import io.prometheus.client.Collector;
-import io.prometheus.client.CollectorRegistry;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.LinkedList;
-import java.util.List;
-import org.apache.skywalking.oap.server.telemetry.api.MetricFamily;
-import org.apache.skywalking.oap.server.telemetry.api.MetricsCollector;
-
-/**
- * Implement MetricCollector to generate prometheus metrics.
- */
-public class So11yMetricsCollector implements MetricsCollector {
- @Override
- public Iterable<MetricFamily> collect() {
- Enumeration<Collector.MetricFamilySamples> mfs = CollectorRegistry.defaultRegistry.metricFamilySamples();
- List<MetricFamily> result = new LinkedList<>();
- while (mfs.hasMoreElements()) {
- Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement();
- List<MetricFamily.Sample> samples = new ArrayList<>(metricFamilySamples.samples.size());
- MetricFamily m = new MetricFamily(metricFamilySamples.name, MetricFamily.Type.valueOf(metricFamilySamples.type
- .name()), metricFamilySamples.help, samples);
- result.add(m);
- for (Collector.MetricFamilySamples.Sample sample : metricFamilySamples.samples) {
- samples.add(new MetricFamily.Sample(sample.name, sample.labelNames, sample.labelValues, sample.value, sample.timestampMs));
- }
- }
- return result;
- }
-}
diff --git a/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yMetricsCreator.java b/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yMetricsCreator.java
deleted file mode 100644
index 1bd8520..0000000
--- a/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yMetricsCreator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.telemetry.so11y;
-
-import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
-import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
-import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
-import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
-import org.apache.skywalking.oap.server.telemetry.prometheus.PrometheusMetricsCreator;
-
-/**
- * Delegate prometheus metrics creator.
- */
-public class So11yMetricsCreator extends PrometheusMetricsCreator {
-
- @Override
- public CounterMetrics createCounter(String name, String tips, MetricsTag.Keys tagKeys,
- MetricsTag.Values tagValues) {
- return super.createCounter(name, tips, tagKeys, tagValues);
- }
-
- @Override
- public GaugeMetrics createGauge(String name, String tips, MetricsTag.Keys tagKeys, MetricsTag.Values tagValues) {
- return super.createGauge(name, tips, tagKeys, tagValues);
- }
-
- @Override
- public HistogramMetrics createHistogramMetric(String name, String tips, MetricsTag.Keys tagKeys,
- MetricsTag.Values tagValues, double... buckets) {
- return super.createHistogramMetric(name, tips, tagKeys, tagValues, buckets);
- }
-}
diff --git a/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yTelemetryProvider.java b/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yTelemetryProvider.java
deleted file mode 100644
index a618a8a..0000000
--- a/oap-server/server-telemetry/telemetry-so11y/src/main/java/org/apache/skywalking/oap/server/telemetry/so11y/So11yTelemetryProvider.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.telemetry.so11y;
-
-import io.prometheus.client.exporter.HTTPServer;
-import io.prometheus.client.hotspot.DefaultExports;
-import java.io.IOException;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.MetricsCollector;
-import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
-
-/**
- * Self observability telemetry provider.
- */
-public class So11yTelemetryProvider extends ModuleProvider {
- private So11yConfig config;
-
- public So11yTelemetryProvider() {
- config = new So11yConfig();
- }
-
- @Override
- public String name() {
- return "so11y";
- }
-
- @Override
- public Class<? extends ModuleDefine> module() {
- return TelemetryModule.class;
- }
-
- @Override
- public ModuleConfig createConfigBeanIfAbsent() {
- return config;
- }
-
- @Override
- public void prepare() throws ServiceNotProvidedException, ModuleStartException {
- this.registerServiceImplementation(MetricsCreator.class, new So11yMetricsCreator());
- this.registerServiceImplementation(MetricsCollector.class, new So11yMetricsCollector());
- if (config.isPrometheusExporterEnabled()) {
- try {
- new HTTPServer(config.getPrometheusExporterHost(), config.getPrometheusExporterPort());
- } catch (IOException e) {
- throw new ModuleStartException(e.getMessage(), e);
- }
- }
- DefaultExports.initialize();
- }
-
- @Override
- public void start() throws ServiceNotProvidedException, ModuleStartException {
-
- }
-
- @Override
- public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
- }
-
- @Override
- public String[] requiredModules() {
- return new String[0];
- }
-}
diff --git a/oap-server/server-telemetry/telemetry-so11y/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-telemetry/telemetry-so11y/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
deleted file mode 100644
index fed0b08..0000000
--- a/oap-server/server-telemetry/telemetry-so11y/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-
-
-org.apache.skywalking.oap.server.telemetry.so11y.So11yTelemetryProvider
diff --git a/tools/dependencies/known-oap-backend-dependencies-es7.txt b/tools/dependencies/known-oap-backend-dependencies-es7.txt
index fc75930..42df098 100755
--- a/tools/dependencies/known-oap-backend-dependencies-es7.txt
+++ b/tools/dependencies/known-oap-backend-dependencies-es7.txt
@@ -161,3 +161,5 @@ influxdb-java-2.15.jar
logging-interceptor-3.13.1.jar
moshi-1.5.0.jar
msgpack-core-0.8.16.jar
+vavr-0.10.3.jar
+vavr-match-0.10.3.jar
diff --git a/tools/dependencies/known-oap-backend-dependencies.txt b/tools/dependencies/known-oap-backend-dependencies.txt
index 087e011..42620cb 100755
--- a/tools/dependencies/known-oap-backend-dependencies.txt
+++ b/tools/dependencies/known-oap-backend-dependencies.txt
@@ -160,3 +160,5 @@ influxdb-java-2.15.jar
logging-interceptor-3.13.1.jar
moshi-1.5.0.jar
msgpack-core-0.8.16.jar
+vavr-0.10.3.jar
+vavr-match-0.10.3.jar