You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2022/11/05 00:08:35 UTC
[skywalking] branch master updated: Support the telegraf receiver plugin module (#9620)
This is an automated email from the ASF dual-hosted git repository.
liuhaoyangzz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 42f3396b64 Support the telegraf receiver plugin module (#9620)
42f3396b64 is described below
commit 42f3396b647f331a29f7a4deef427ba2fbbd284d
Author: soander <Wa...@outlook.com>
AuthorDate: Fri Nov 4 17:08:20 2022 -0700
Support the telegraf receiver plugin module (#9620)
* The telegraf receiver plugin module development.
* Refactored code of converting telegraf data and fixed some errors.
* Support Telegraf receiver plugin module.
* Add telegraf-receiver.md, update changes.md and vm-monitoring.md.
* Rename YAML file and change code style.
* Change receiver-telegraf of application.yml.
* The receiver-telegraf e2e test.
* The telegraf receiver e2e test.
* Fix an issue and delete redundant configs.
* Add Unit Test about converting Telegraf metrics.
* Adjust Unit Test about converting Telegraf metrics.
* Add License Header to Unit Test and change binary.xml.
* Change module provider's config initialization mechanism.
* Exclude the telegraf-rules in server-starter pom.xml.
* Fix telegraf e2e test issues.
* Change telegraf e2e test.
* Fix issues
* Fix issues.
* Add Sample convert Unit Test.
* Change vm.yaml, related documents and fix some issues.
* Change backend-vm-monitoring.md.
* Fix SampleConvertTest checkstyle issue.
* Change vm.yaml swap MAL.
* Update menu.yml and binary.xml.
* Update Telegraf Unit test.
* Delete telegraf config package, use meter.analyzer.prometheus package to load config file.
* Reorder telegraf metrics in menu.yml.
* Change e2e, vm, config, linux-service and vm.md.
* Update telegraf.conf file.
* Change url of telegraf.conf file.
* Update e2e.yaml, conf file, menu.yml and delete useless code of provider.
* Update grouping sampleFamily by timestamp and name, and add new UTs.
* Update vm-monitoring.md and .asf.yaml.
Co-authored-by: Superskyyy (ONLINE) <Su...@outlook.com>
Co-authored-by: 吴晟 Wu Sheng <wu...@foxmail.com>
---
.asf.yaml | 1 +
.github/workflows/skywalking.yaml | 2 +
apm-dist/src/main/assembly/binary.xml | 1 +
docs/en/setup/backend/backend-vm-monitoring.md | 48 +-
docs/en/setup/backend/telegraf-receiver.md | 45 ++
docs/menu.yml | 2 +
oap-server/server-receiver-plugin/pom.xml | 1 +
.../skywalking-telegraf-receiver-plugin/pom.xml | 44 ++
.../telegraf/module/TelegrafReceiverModule.java | 33 ++
.../telegraf/provider/TelegrafModuleConfig.java | 37 ++
.../provider/TelegrafReceiverProvider.java | 106 +++++
.../provider/handler/TelegrafServiceHandler.java | 140 ++++++
.../provider/handler/pojo/TelegrafData.java | 52 +++
.../provider/handler/pojo/TelegrafDatum.java | 37 ++
...ywalking.oap.server.library.module.ModuleDefine | 19 +
...alking.oap.server.library.module.ModuleProvider | 19 +
.../receiver/telegraf/TelegrafMetricsTest.java | 490 +++++++++++++++++++++
.../receiver/telegraf/mock/MockModuleManager.java | 53 +++
.../receiver/telegraf/mock/MockModuleProvider.java | 47 ++
.../src/test/resources/telegraf-rules/vm.yaml | 72 +++
oap-server/server-starter/pom.xml | 6 +
.../src/main/resources/application.yml | 5 +
.../src/main/resources/telegraf-rules/vm.yaml | 72 +++
test/e2e-v2/cases/vm/telegraf/docker-compose.yml | 40 ++
test/e2e-v2/cases/vm/telegraf/e2e.yaml | 53 +++
test/e2e-v2/cases/vm/telegraf/telegraf.conf | 152 +++++++
26 files changed, 1561 insertions(+), 16 deletions(-)
diff --git a/.asf.yaml b/.asf.yaml
index 275f60485f..3820cd5524 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -33,6 +33,7 @@ github:
- open-telemetry
- zabbix
- ebpf
+ - telegraf
enabled_merge_buttons:
squash: true
merge: false
diff --git a/.github/workflows/skywalking.yaml b/.github/workflows/skywalking.yaml
index 3bd49516a8..6341f7260b 100644
--- a/.github/workflows/skywalking.yaml
+++ b/.github/workflows/skywalking.yaml
@@ -549,6 +549,8 @@ jobs:
config: test/e2e-v2/cases/vm/zabbix/e2e.yaml
- name: VM Prometheus
config: test/e2e-v2/cases/vm/prometheus-node-exporter/e2e.yaml
+ - name: VM Telegraf
+ config: test/e2e-v2/cases/vm/telegraf/e2e.yaml
- name: So11y
config: test/e2e-v2/cases/so11y/e2e.yaml
- name: MySQL Prometheus and slowsql
diff --git a/apm-dist/src/main/assembly/binary.xml b/apm-dist/src/main/assembly/binary.xml
index cef37bf39f..259f721272 100644
--- a/apm-dist/src/main/assembly/binary.xml
+++ b/apm-dist/src/main/assembly/binary.xml
@@ -69,6 +69,7 @@
<include>ui-initialized-templates/*/*.json</include>
<include>lal/*</include>
<include>log-mal-rules/*</include>
+ <include>telegraf-rules/*</include>
</includes>
<outputDirectory>config</outputDirectory>
</fileSet>
diff --git a/docs/en/setup/backend/backend-vm-monitoring.md b/docs/en/setup/backend/backend-vm-monitoring.md
index 45589b3588..9edf8d1413 100644
--- a/docs/en/setup/backend/backend-vm-monitoring.md
+++ b/docs/en/setup/backend/backend-vm-monitoring.md
@@ -3,38 +3,54 @@ SkyWalking leverages Prometheus node-exporter to collect metrics data from the V
[OpenTelemetry receiver](opentelemetry-receiver.md) and into the [Meter System](./../../concepts-and-designs/meter.md).
VM entity as a `Service` in OAP and on the `Layer: OS_LINUX`.
+SkyWalking also provides InfluxDB Telegraf to receive VMs' metrics data by [Telegraf receiver](./telegraf-receiver.md).
+The telegraf receiver plugin receiver, process and convert the metrics, then it send converted metrics to [Meter System](./../../concepts-and-designs/meter.md).
+VM entity as a `Service` in OAP and on the `Layer: OS_LINUX`.
+
## Data flow
+**For OpenTelemetry receiver:**
1. The Prometheus node-exporter collects metrics data from the VMs.
2. The OpenTelemetry Collector fetches metrics from node-exporter via Prometheus Receiver and pushes metrics to the SkyWalking OAP Server via the OpenCensus gRPC Exporter or OpenTelemetry gRPC exporter.
3. The SkyWalking OAP Server parses the expression with [MAL](../../concepts-and-designs/mal.md) to filter/calculate/aggregate and store the results.
+**For Telegraf receiver:**
+1. The InfluxDB Telegraf [input plugins](https://docs.influxdata.com/telegraf/v1.24/plugins/) collects various metrics data from the VMs.
+2. The cpu, mem, system, disk and diskio input plugins should be set in telegraf.conf file.
+2. The InfluxDB Telegraf send `JSON` format metrics by `HTTP` messages to Telegraf Receiver, then pushes converted metrics to the SkyWalking OAP Server [Meter System](./../../concepts-and-designs/meter.md).
+3. The SkyWalking OAP Server parses the expression with [MAL](../../concepts-and-designs/mal.md) to filter/calculate/aggregate ad store the results.
+4. The meter_vm_cpu_average_used metrics indicates the average usage of each CPU core for telegraf receiver.
## Setup
-
+**For OpenTelemetry receiver:**
1. Setup [Prometheus node-exporter](https://prometheus.io/docs/guides/node-exporter/).
2. Setup [OpenTelemetry Collector ](https://opentelemetry.io/docs/collector/). This is an example for OpenTelemetry Collector configuration [otel-collector-config.yaml](../../../../test/e2e-v2/cases/vm/prometheus-node-exporter/otel-collector-config.yaml).
3. Config SkyWalking [OpenTelemetry receiver](opentelemetry-receiver.md).
+**For Telegraf receiver:**
+1. Setup InfluxDB Telegraf's `telegraf.conf file` according to [Telegraf office document](https://docs.influxdata.com/telegraf/v1.24/).
+2. Setup InfluxDB Telegraf's `telegraf.conf file` specific rules according to [Telegraf receiver document](telegraf-receiver.md).
+3. Config SkyWalking [Telegraf receiver](telegraf-receiver.md).
+
## Supported Metrics
-| Monitoring Panel | Unit | Metric Name | Description | Data Source |
-|-----|-----|-----|-----|-----|
-| CPU Usage | % | cpu_total_percentage | The total percentage usage of the CPU core. If there are 2 cores, the maximum usage is 200%. | Prometheus node-exporter |
-| Memory RAM Usage | MB | meter_vm_memory_used | The total RAM usage | Prometheus node-exporter |
-| Memory Swap Usage | % | meter_vm_memory_swap_percentage | The percentage usage of swap memory | Prometheus node-exporter |
-| CPU Average Used | % | meter_vm_cpu_average_used | The percentage usage of the CPU core in each mode | Prometheus node-exporter |
-| CPU Load | | meter_vm_cpu_load1<br />meter_vm_cpu_load5<br />meter_vm_cpu_load15 | The CPU 1m / 5m / 15m average load | Prometheus node-exporter |
-| Memory RAM | MB | meter_vm_memory_total<br />meter_vm_memory_available<br />meter_vm_memory_used | The RAM statistics, including Total / Available / Used | Prometheus node-exporter |
-| Memory Swap | MB | meter_vm_memory_swap_free<br />meter_vm_memory_swap_total | Swap memory statistics, including Free / Total | Prometheus node-exporter |
-| File System Mountpoint Usage | % | meter_vm_filesystem_percentage | The percentage usage of the file system at each mount point | Prometheus node-exporter |
-| Disk R/W | KB/s | meter_vm_disk_read,meter_vm_disk_written | The disk read and written | Prometheus node-exporter |
-| Network Bandwidth Usage | KB/s | meter_vm_network_receive<br />meter_vm_network_transmit | The network receive and transmit | Prometheus node-exporter |
-| Network Status | | meter_vm_tcp_curr_estab<br />meter_vm_tcp_tw<br />meter_vm_tcp_alloc<br />meter_vm_sockets_used<br />meter_vm_udp_inuse | The number of TCPs established / TCP time wait / TCPs allocated / sockets in use / UDPs in use | Prometheus node-exporter |
-| Filefd Allocated | | meter_vm_filefd_allocated | The number of file descriptors allocated | Prometheus node-exporter |
+| Monitoring Panel | Unit | Metric Name | Description | Data Source |
+|------------------------------|------|-------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------|-----------------------------------------------------|
+| CPU Usage | % | meter_vm_cpu_total_percentage | The total percentage usage of the CPU core. If there are 2 cores, the maximum usage is 200%. | Prometheus node-exporter<br />Telegraf input plugin |
+| Memory RAM Usage | MB | meter_vm_memory_used | The total RAM usage | Prometheus node-exporter<br />Telegraf input plugin |
+| Memory Swap Usage | % | meter_vm_memory_swap_percentage | The percentage usage of swap memory | Prometheus node-exporter<br />Telegraf input plugin |
+| CPU Average Used | % | meter_vm_cpu_average_used | The percentage usage of the CPU core in each mode | Prometheus node-exporter<br />Telegraf input plugin |
+| CPU Load | | meter_vm_cpu_load1<br />meter_vm_cpu_load5<br />meter_vm_cpu_load15 | The CPU 1m / 5m / 15m average load | Prometheus node-exporter<br />Telegraf input plugin |
+| Memory RAM | MB | meter_vm_memory_total<br />meter_vm_memory_available<br />meter_vm_memory_used | The RAM statistics, including Total / Available / Used | Prometheus node-exporter<br />Telegraf input plugin |
+| Memory Swap | MB | meter_vm_memory_swap_free<br />meter_vm_memory_swap_total | Swap memory statistics, including Free / Total | Prometheus node-exporter<br />Telegraf input plugin |
+| File System Mountpoint Usage | % | meter_vm_filesystem_percentage | The percentage usage of the file system at each mount point | Prometheus node-exporter<br />Telegraf input plugin |
+| Disk R/W | KB/s | meter_vm_disk_read,meter_vm_disk_written | The disk read and written | Prometheus node-exporter<br />Telegraf input plugin |
+| Network Bandwidth Usage | KB/s | meter_vm_network_receive<br />meter_vm_network_transmit | The network receive and transmit | Prometheus node-exporter<br />Telegraf input plugin |
+| Network Status | | meter_vm_tcp_curr_estab<br />meter_vm_tcp_tw<br />meter_vm_tcp_alloc<br />meter_vm_sockets_used<br />meter_vm_udp_inuse | The number of TCPs established / TCP time wait / TCPs allocated / sockets in use / UDPs in use | Prometheus node-exporter<br />Telegraf input plugin |
+| Filefd Allocated | | meter_vm_filefd_allocated | The number of file descriptors allocated | Prometheus node-exporter |
## Customizing
You can customize your own metrics/expression/dashboard panel.
-The metrics definition and expression rules are found in `/config/otel-rules/vm.yaml`.
+The metrics definition and expression rules are found in `/config/otel-rules/vm.yaml` and `/config/telegraf-rules/vm.yaml`.
The dashboard panel confirmations are found in `/config/ui-initialized-templates/os_linux`.
## Blog
diff --git a/docs/en/setup/backend/telegraf-receiver.md b/docs/en/setup/backend/telegraf-receiver.md
new file mode 100644
index 0000000000..9512ee8704
--- /dev/null
+++ b/docs/en/setup/backend/telegraf-receiver.md
@@ -0,0 +1,45 @@
+# Telegraf receiver
+
+The Telegraf receiver supports receiving InfluxDB Telegraf's metrics by meter-system.
+The OAP can load the configuration at bootstrap. The files are located at `$CLASSPATH/telegraf-rules`.
+If the new configuration is not well-formed, the OAP may fail to start up.
+
+This is the [InfluxDB Telegraf](https://docs.influxdata.com/telegraf/v1.24/) Document,
+the Telegraf receiver can handle Telegraf's [CPU Input Plugin](https://github.com/influxdata/telegraf/blob/release-1.24/plugins/inputs/cpu/README.md),
+[Memory Input Plugin](https://github.com/influxdata/telegraf/blob/release-1.24/plugins/inputs/mem/README.md).
+
+There are many other telegraf input plugins, users can customize different input plugins' rule files.
+The rule file should be in YAML format, defined by the scheme described in [MAL](../../concepts-and-designs/mal.md).
+Please see the [telegraf plugin directory](https://docs.influxdata.com/telegraf/v1.24/plugins/) for more input plugins information.
+
+**Notice:**
+* The Telegraf receiver module uses `HTTP` to receive telegraf's metrics,
+so the outputs method should be set `[[outputs.http]]` in telegraf.conf file.
+Please see the [http outputs](https://github.com/influxdata/telegraf/blob/release-1.24/plugins/outputs/http/README.md)
+for more details.
+
+* The Telegraf receiver module **only** process telegraf's `JSON` metrics format,
+the data format should be set `data_format = "json"` in telegraf.conf file.
+Please see the [JSON data format](https://docs.influxdata.com/telegraf/v1.24/data_formats/output/json/)
+for more details.
+
+* The default `json_timestamp_units` is second in JSON output,
+and the Telegraf receiver module **only** process `second` timestamp unit.
+If users configure `json_timestamp_units` in telegraf.conf file, `json_timestamp_units = "1s"` is feasible.
+Please see the [JSON data format](https://docs.influxdata.com/telegraf/v1.24/data_formats/output/json/)
+for more details.
+
+The following is the default telegraf receiver YAML rule file in the `application.yml`,
+Set `SW_RECEIVER_TELEGRAF:default` through system environment or change `SW_RECEIVER_TELEGRAF_ACTIVE_FILES:vm`
+to activate the OpenTelemetry receiver with `vm.yml` in telegraf-rules.
+```yaml
+receiver-telegraf:
+ selector: ${SW_RECEIVER_TELEGRAF:default}
+ default:
+ activeFiles: ${SW_RECEIVER_TELEGRAF_ACTIVE_FILES:vm}
+```
+
+| Rule Name | Description | Configuration File | Data Source |
+|-----------|----------------|------------------------|-------------------------------------------------------------------------|
+| vm | Metrics of VMs | telegraf-rules/vm.yaml | Telegraf inputs plugins --> Telegraf Receiver --> SkyWalking OAP Server |
+
diff --git a/docs/menu.yml b/docs/menu.yml
index 220fba39bd..2f85c091c4 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -119,6 +119,8 @@ catalog:
path: "/en/setup/backend/backend-zabbix"
- name: "Meter Analysis"
path: "/en/setup/backend/backend-meter"
+ - name: "Telegraf Metrics"
+ path: "/en/setup/backend/telegraf-receiver"
- name: "Apdex Threshold"
path: "/en/setup/backend/apdex-threshold"
- name: "Spring Sleuth Metrics Analysis"
diff --git a/oap-server/server-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/pom.xml
index 7255194f08..1a723f6a86 100644
--- a/oap-server/server-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/pom.xml
@@ -46,6 +46,7 @@
<module>skywalking-event-receiver-plugin</module>
<module>skywalking-zabbix-receiver-plugin</module>
<module>skywalking-ebpf-receiver-plugin</module>
+ <module>skywalking-telegraf-receiver-plugin</module>
</modules>
<dependencies>
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/pom.xml
new file mode 100644
index 0000000000..b1a6144c1a
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>server-receiver-plugin</artifactId>
+ <groupId>org.apache.skywalking</groupId>
+ <version>9.3.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>skywalking-telegraf-receiver-plugin</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>agent-analyzer</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>skywalking-sharing-server-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/module/TelegrafReceiverModule.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/module/TelegrafReceiverModule.java
new file mode 100644
index 0000000000..8e46f74d03
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/module/TelegrafReceiverModule.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.module;
+
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+
+public class TelegrafReceiverModule extends ModuleDefine {
+
+ public TelegrafReceiverModule() {
+ super("receiver-telegraf");
+ }
+
+ @Override
+ public Class[] services() {
+ return new Class[0];
+ }
+}
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/TelegrafModuleConfig.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/TelegrafModuleConfig.java
new file mode 100644
index 0000000000..40c40a8e3a
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/TelegrafModuleConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.provider;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+
+@Setter
+@Getter
+public class TelegrafModuleConfig extends ModuleConfig {
+
+ public static final String CONFIG_PATH = "telegraf-rules";
+
+ /**
+ * active receive configs, files split by ","
+ */
+ private String activeFiles = Const.EMPTY_STRING;
+
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/TelegrafReceiverProvider.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/TelegrafReceiverProvider.java
new file mode 100644
index 0000000000..871294afa9
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/TelegrafReceiverProvider.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.provider;
+
+import com.google.common.base.Splitter;
+import com.linecorp.armeria.common.HttpMethod;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rules;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
+import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
+import org.apache.skywalking.oap.server.receiver.telegraf.module.TelegrafReceiverModule;
+import org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.TelegrafServiceHandler;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class TelegrafReceiverProvider extends ModuleProvider {
+ private List<Rule> configs;
+ private TelegrafModuleConfig moduleConfig;
+
+ @Override
+ public String name() {
+ return "default";
+ }
+
+ @Override
+ public Class<? extends ModuleDefine> module() {
+ return TelegrafReceiverModule.class;
+ }
+
+ @Override
+ public ConfigCreator newConfigCreator() {
+ return new ConfigCreator<TelegrafModuleConfig>() {
+ @Override
+ public Class type() {
+ return TelegrafModuleConfig.class;
+ }
+
+ @Override
+ public void onInitialized(final TelegrafModuleConfig initialized) {
+ moduleConfig = initialized;
+ }
+ };
+ }
+
+ @Override
+ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+ try {
+ configs = Rules.loadRules(TelegrafModuleConfig.CONFIG_PATH,
+ StringUtil.isEmpty(moduleConfig.getActiveFiles()) ? Collections.emptyList() : Splitter.on(",").splitToList(moduleConfig.getActiveFiles()));
+ } catch (IOException e) {
+ throw new ModuleStartException("Failed to load MAL rules", e);
+ }
+ }
+
+ @Override
+ public void start() throws ServiceNotProvidedException, ModuleStartException {
+ if (CollectionUtils.isNotEmpty(configs)) {
+ HTTPHandlerRegister httpHandlerRegister = getManager().find(SharingServerModule.NAME)
+ .provider()
+ .getService(HTTPHandlerRegister.class);
+ MeterSystem meterSystem = getManager().find(CoreModule.NAME)
+ .provider()
+ .getService(MeterSystem.class);
+ httpHandlerRegister.addHandler(new TelegrafServiceHandler(getManager(), meterSystem, configs),
+ Collections.singletonList(HttpMethod.POST));
+ }
+ }
+
+ @Override
+ public void notifyAfterCompleted() throws ServiceNotProvidedException {
+ }
+
+ @Override
+ public String[] requiredModules() {
+ return new String[] {
+ CoreModule.NAME,
+ SharingServerModule.NAME
+ };
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/TelegrafServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/TelegrafServiceHandler.java
new file mode 100644
index 0000000000..cd4dde4170
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/TelegrafServiceHandler.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.provider.handler;
+
+import com.google.common.collect.ImmutableMap;
+import com.linecorp.armeria.server.annotation.Post;
+import com.linecorp.armeria.server.annotation.RequestConverter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.common.v3.Commands;
+import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
+import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
+import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
+import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamilyBuilder;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.pojo.TelegrafData;
+import org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.pojo.TelegrafDatum;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class TelegrafServiceHandler {
+
+ private final HistogramMetrics histogram;
+ private final CounterMetrics errorCounter;
+ private List<MetricConvert> metricConvert;
+
+ public TelegrafServiceHandler(ModuleManager moduleManager, MeterSystem meterSystem, List<Rule> rules) {
+
+ this.metricConvert = rules.stream().map(r -> new MetricConvert(r, meterSystem)).collect(Collectors.toList());
+
+ final MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
+ .provider()
+ .getService(MetricsCreator.class);
+
+ histogram = metricsCreator.createHistogramMetric(
+ "telegraf_in_latency", "The process latency of telegraf data",
+ new MetricsTag.Keys("protocol"), new MetricsTag.Values("http")
+ );
+
+ errorCounter = metricsCreator.createCounter(
+ "telegraf_error_count", "The error number of telegraf analysis",
+ new MetricsTag.Keys("protocol"), new MetricsTag.Values("http")
+ );
+ }
+
+ /**
+ * Convert the TelegrafData object to meter {@link Sample}
+ **/
+ public List<Sample> convertTelegraf(TelegrafDatum telegrafData) {
+
+ List<Sample> sampleList = new ArrayList<>();
+
+ Map<String, Object> fields = telegrafData.getFields();
+ String name = telegrafData.getName();
+ Map<String, String> tags = telegrafData.getTags();
+ ImmutableMap<String, String> immutableTags = ImmutableMap.copyOf(tags);
+ long timestamp = telegrafData.getTimestamp();
+
+ fields.forEach((key, value) -> {
+ if (value instanceof Number) {
+ Sample.SampleBuilder builder = Sample.builder();
+ Sample sample = builder.name(name + "_" + key)
+ .timestamp(timestamp * 1000L)
+ .value(((Number) value).doubleValue())
+ .labels(immutableTags).build();
+
+ sampleList.add(sample);
+ }
+ });
+ return sampleList;
+
+ }
+
+ public List<ImmutableMap<String, SampleFamily>> convertSampleFamily(TelegrafData telegrafData) {
+ List<Sample> allSamples = new ArrayList<>();
+
+ List<TelegrafDatum> metrics = telegrafData.getMetrics();
+ for (TelegrafDatum m : metrics) {
+ List<Sample> samples = convertTelegraf(m);
+ allSamples.addAll(samples);
+ }
+
+ List<ImmutableMap<String, SampleFamily>> res = new ArrayList<>();
+
+ // Grouping all samples by timestamp name
+ Map<Long, List<Sample>> sampleFamilyByTime = allSamples.stream()
+ .collect(Collectors.groupingBy(Sample::getTimestamp));
+
+ // Grouping all samples with the same timestamp by name
+ for (List<Sample> s : sampleFamilyByTime.values()) {
+ ImmutableMap.Builder<String, SampleFamily> builder = ImmutableMap.builder();
+ Map<String, List<Sample>> sampleFamilyByName = s.stream()
+ .collect(Collectors.groupingBy(Sample::getName));
+ sampleFamilyByName.forEach((k, v) ->
+ builder.put(k, SampleFamilyBuilder.newBuilder(v.toArray(new Sample[0])).build()));
+ res.add(builder.build());
+ }
+
+ return res;
+ }
+
+ @Post("/telegraf")
+ @RequestConverter(TelegrafData.class)
+ public Commands collectData(TelegrafData telegrafData) {
+ try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
+ List<ImmutableMap<String, SampleFamily>> sampleFamily = convertSampleFamily(telegrafData);
+ sampleFamily.forEach(s -> metricConvert.forEach(m -> m.toMeter(s)));
+ } catch (Exception e) {
+ errorCounter.inc();
+ log.error(e.getMessage(), e);
+ }
+ return Commands.newBuilder().build();
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/pojo/TelegrafData.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/pojo/TelegrafData.java
new file mode 100644
index 0000000000..01db226aa6
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/pojo/TelegrafData.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.pojo;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.linecorp.armeria.common.AggregatedHttpRequest;
+import com.linecorp.armeria.common.annotation.Nullable;
+import com.linecorp.armeria.server.ServiceRequestContext;
+import com.linecorp.armeria.server.annotation.RequestConverterFunction;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.lang.reflect.ParameterizedType;
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TelegrafData implements RequestConverterFunction {
+ private List<TelegrafDatum> metrics;
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ @Override
+ public @Nullable Object convertRequest(ServiceRequestContext ctx, AggregatedHttpRequest request, Class<?> expectedResultType,
+ @Nullable ParameterizedType expectedParameterizedResultType) throws Exception {
+
+ if (expectedResultType == TelegrafData.class) {
+ // Convert the request to a TelegrafData object
+ return MAPPER.readValue(request.contentUtf8(), TelegrafData.class);
+ }
+ return RequestConverterFunction.fallthrough();
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/pojo/TelegrafDatum.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/pojo/TelegrafDatum.java
new file mode 100644
index 0000000000..7a53f4e7d7
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/pojo/TelegrafDatum.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.pojo;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TelegrafDatum {
+ private Map<String, Object> fields;
+ private String name;
+ private Map<String, String> tags;
+ private long timestamp;
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine
new file mode 100644
index 0000000000..1aa67acc02
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.receiver.telegraf.module.TelegrafReceiverModule
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100644
index 0000000000..4fb8e36621
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.receiver.telegraf.provider.TelegrafReceiverProvider
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/TelegrafMetricsTest.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/TelegrafMetricsTest.java
new file mode 100644
index 0000000000..98c8132409
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/TelegrafMetricsTest.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
+import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rules;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.CoreModuleProvider;
+import org.apache.skywalking.oap.server.core.analysis.StreamDefinition;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
+import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.config.NamingControl;
+import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.telegraf.mock.MockModuleManager;
+import org.apache.skywalking.oap.server.receiver.telegraf.mock.MockModuleProvider;
+import org.apache.skywalking.oap.server.receiver.telegraf.provider.TelegrafModuleConfig;
+import org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.TelegrafServiceHandler;
+import org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.pojo.TelegrafData;
+import org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.pojo.TelegrafDatum;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.powermock.reflect.Whitebox;
+import org.testcontainers.shaded.com.fasterxml.jackson.core.JsonParseException;
+import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class TelegrafMetricsTest {
+
+ protected CoreModuleProvider moduleProvider;
+ protected ModuleManager moduleManager;
+ protected MeterSystem meterSystem;
+ protected TelegrafServiceHandler telegrafServiceHandler;
+
+ private List<AcceptableValue> values = new ArrayList<>();
+
+ @BeforeClass
+ public static void setup() {
+ MeterEntity.setNamingControl(
+ new NamingControl(512, 512, 512, new EndpointNameGrouping()));
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ MeterEntity.setNamingControl(null);
+ }
+
+ @Before
+ public void setupMetrics() throws Throwable {
+ moduleProvider = Mockito.mock(CoreModuleProvider.class);
+ moduleManager = new MockModuleManager() {
+ @Override
+ protected void init() {
+ register(CoreModule.NAME, () -> new MockModuleProvider() {
+ @Override
+ protected void register() {
+ registerServiceImplementation(NamingControl.class, new NamingControl(
+ 512, 512, 512, new EndpointNameGrouping()));
+ }
+ });
+ register(TelemetryModule.NAME, () -> new MockModuleProvider() {
+ @Override
+ protected void register() {
+ registerServiceImplementation(MetricsCreator.class, new MetricsCreatorNoop());
+ }
+ });
+ }
+ };
+
+ // prepare the context
+ meterSystem = Mockito.mock(MeterSystem.class);
+ Whitebox.setInternalState(MetricsStreamProcessor.class, "PROCESSOR",
+ Mockito.spy(MetricsStreamProcessor.getInstance()));
+ doNothing().when(MetricsStreamProcessor.getInstance()).create(any(), (StreamDefinition) any(), any());
+ CoreModule coreModule = Mockito.spy(CoreModule.class);
+
+ Whitebox.setInternalState(coreModule, "loadedProvider", moduleProvider);
+ when(moduleProvider.getService(MeterSystem.class)).thenReturn(meterSystem);
+
+ telegrafServiceHandler = buildTelegrafServiceHandler();
+ }
+
+ protected TelegrafServiceHandler buildTelegrafServiceHandler() throws Exception {
+ // Notifies meter system received metric
+ doAnswer(invocationOnMock -> {
+ values.add(invocationOnMock.getArgument(0, AcceptableValue.class));
+ return null;
+ }).when(meterSystem).doStreamingCalculation(any());
+
+ // load context
+ List<Rule> telegrafConfigs = Rules.loadRules(TelegrafModuleConfig.CONFIG_PATH, Arrays.asList("vm"));
+ return new TelegrafServiceHandler(moduleManager, meterSystem, telegrafConfigs);
+ }
+
+ private TelegrafData assertTelegrafJSONConvert(String jsonMessage) throws Exception {
+ Assert.assertNotNull(jsonMessage);
+ ObjectMapper mapper = new ObjectMapper();
+ TelegrafData telegrafData = mapper.readValue(jsonMessage, TelegrafData.class);
+ Assert.assertNotNull(telegrafData);
+ return telegrafData;
+ }
+
+ private void assertSample(Sample sample, String... name) {
+ Assert.assertNotNull(sample);
+ Assert.assertTrue(Arrays.asList(name).contains(sample.getName()));
+ Assert.assertEquals(sample.getLabels(), ImmutableMap.copyOf(Collections.singletonMap("host", "localHost")));
+ Assert.assertTrue(sample.getValue() >= 0);
+ Assert.assertTrue(sample.getTimestamp() >= 0);
+ }
+
+ private void assertConvertToSample(TelegrafData telegrafData, int totalSamplesPerMetrics, String... name) {
+ Assert.assertNotNull(telegrafData);
+ List<TelegrafDatum> metrics = telegrafData.getMetrics();
+ Assert.assertNotNull(metrics);
+ for (TelegrafDatum t : metrics) {
+ boolean convert = false;
+ List<Sample> samples = telegrafServiceHandler.convertTelegraf(t);
+ Assert.assertEquals(samples.size(), totalSamplesPerMetrics);
+ for (Sample s : samples) {
+ assertSample(s, name);
+ convert = true;
+ }
+ if (!convert) {
+ // Throw exception when key not found
+ throw new AssertionError("Failed to convert json telegraf message to Sample.");
+ }
+ }
+ }
+
+ private void assertConvertToSampleFamily(TelegrafData telegrafData, int sampleFamilySize, int samplePerSampleFamily, String... name) {
+ Assert.assertNotNull(telegrafData);
+ List<ImmutableMap<String, SampleFamily>> sampleFamilyCollections = telegrafServiceHandler.convertSampleFamily(telegrafData);
+ Assert.assertNotNull(sampleFamilyCollections);
+ int actualSize = 0;
+ for (ImmutableMap<String, SampleFamily> samples : sampleFamilyCollections) {
+ samples.forEach((k, v) -> Assert.assertEquals(v.samples.length, samplePerSampleFamily));
+ actualSize += samples.size();
+ }
+ Assert.assertEquals(actualSize, sampleFamilySize);
+ sampleFamilyCollections.forEach(sampleFamily -> {
+ for (Map.Entry<String, SampleFamily> entry : sampleFamily.entrySet()) {
+ String key = entry.getKey();
+ SampleFamily value = entry.getValue();
+ boolean convert = false;
+ Assert.assertTrue(Arrays.asList(name).contains(key));
+ for (Sample s : value.samples) {
+ assertSample(s, name);
+ convert = true;
+ }
+ if (!convert) {
+ // Throw exception when key not found
+ throw new AssertionError("Failed to convert json telegraf message to Sample.");
+ }
+ }
+ });
+ }
+
+ @Test
+ public void testOneMemMetrics() throws Throwable {
+ String oneMemMetrics = "{\"metrics\":" +
+ "[{\"fields\":" +
+ "{\"available\":6047739904,\"available_percent\":35.41215070500567,\"total\":17078149120,\"used\":11030409216,\"used_percent\":64.58784929499433}," +
+ "\"name\":\"mem\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}]}";
+
+ TelegrafData telegrafData = assertTelegrafJSONConvert(oneMemMetrics);
+ String[] metricsNames = {"mem_available", "mem_available_percent", "mem_total", "mem_used", "mem_used_percent"};
+ assertConvertToSample(telegrafData, 5, metricsNames);
+ assertConvertToSampleFamily(telegrafData, 5, 1, metricsNames);
+ }
+
+ @Test
+ public void testMultipleMemMetrics() throws Throwable {
+ String threeMemMetrics = "{\"metrics\":" +
+ "[{\"fields\":" +
+ "{\"available\":6047739904,\"available_percent\":35.41215070500567,\"total\":17078149120,\"used\":11030409216,\"used_percent\":64.58784929499433}," +
+ "\"name\":\"mem\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}, " +
+ "{\"fields\":" +
+ "{\"available\":6047563904,\"available_percent\":56.41215070500567,\"total\":27048549120,\"used\":340364409216,\"used_percent\":44.58454929499433}," +
+ "\"name\":\"mem\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663491390}, " +
+ "{\"fields\":" +
+ "{\"available\":5047739904,\"available_percent\":43.41215070500567,\"total\":46078149120,\"used\":45030409216,\"used_percent\":23.58784929499433}," +
+ "\"name\":\"mem\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1453365320}]}";
+
+ TelegrafData telegrafData = assertTelegrafJSONConvert(threeMemMetrics);
+ String[] metricsNames = {"mem_available", "mem_available_percent", "mem_total", "mem_used", "mem_used_percent"};
+ assertConvertToSample(telegrafData, 5, metricsNames);
+ assertConvertToSampleFamily(telegrafData, 15, 1, metricsNames);
+ }
+
+ @Test
+ public void testOneCpuMetrics() throws Throwable {
+ String oneCpuMetrics = "{\"metrics\":" +
+ "[{\"fields\":" +
+ "{\"usage_idle\":95.32710280373831,\"usage_irq\":0.3115264797507788,\"usage_system\":1.7133956386292835,\"usage_user\":2.64797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}]}";
+
+ TelegrafData telegrafData = assertTelegrafJSONConvert(oneCpuMetrics);
+ String[] metricsNames = {"cpu_usage_idle", "cpu_usage_irq", "cpu_usage_system", "cpu_usage_user"};
+ assertConvertToSample(telegrafData, 4, metricsNames);
+ assertConvertToSampleFamily(telegrafData, 4, 1, metricsNames);
+ }
+
+ @Test
+ public void testMultipleCpuMetrics() throws Throwable {
+ String twoCpuMetrics = "{\"metrics\":" +
+ "[{\"fields\":" +
+ "{\"usage_idle\":95.32710280373831,\"usage_irq\":0.3115264797507788,\"usage_system\":1.7133956386292835,\"usage_user\":2.64797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}, " +
+ "{\"fields\":" +
+ "{\"usage_idle\":45.32710280373831,\"usage_irq\":0.4515344797507788,\"usage_system\":3.4533956386292835,\"usage_user\":45.64797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1453365320}]}";
+
+ TelegrafData telegrafData = assertTelegrafJSONConvert(twoCpuMetrics);
+ String[] metricsNames = {"cpu_usage_idle", "cpu_usage_irq", "cpu_usage_system", "cpu_usage_user"};
+ assertConvertToSample(telegrafData, 4, metricsNames);
+ assertConvertToSampleFamily(telegrafData, 8, 1, metricsNames);
+ }
+
+ @Test
+ public void testMultipleCpuMetricsWithSameTimestamp() throws Throwable {
+ String twoCpuMetrics = "{\"metrics\":" +
+ "[{\"fields\":" +
+ "{\"usage_idle\":95.32710280373831,\"usage_irq\":0.3115264797507788,\"usage_system\":1.7133956386292835,\"usage_user\":2.64797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}, " +
+ "{\"fields\":" +
+ "{\"usage_idle\":67.32710280373831,\"usage_irq\":0.5415264797507788,\"usage_system\":1.6533956386292835,\"usage_user\":76.54797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}, " +
+ "{\"fields\":" +
+ "{\"usage_idle\":74.32710280373831,\"usage_irq\":1.2535264797507788,\"usage_system\":4.5633956386292835,\"usage_user\":54.23797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}, " +
+ "{\"fields\":" +
+ "{\"usage_idle\":45.32710280373831,\"usage_irq\":0.4515344797507788,\"usage_system\":3.4533956386292835,\"usage_user\":45.64797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}]}";
+
+ TelegrafData telegrafData = assertTelegrafJSONConvert(twoCpuMetrics);
+ String[] metricsNames = {"cpu_usage_idle", "cpu_usage_irq", "cpu_usage_system", "cpu_usage_user"};
+ assertConvertToSample(telegrafData, 4, metricsNames);
+ assertConvertToSampleFamily(telegrafData, 4, 4, metricsNames);
+ }
+
+ @Test
+ public void testInvalidJSONConvert() {
+ String invalidMetrics = "This is a invalid metrics message";
+
+ Assert.assertThrows("Expected JsonParseException to throw, but it didn't.",
+ JsonParseException.class, () -> assertTelegrafJSONConvert(invalidMetrics));
+ }
+
+ @Test
+ public void testInvalidSampleNames() {
+ String oneCpuMetrics = "{\"metrics\":" +
+ "[{\"fields\":" +
+ "{\"usage_idle\":95.32710280373831,\"usage_irq\":0.3115264797507788,\"usage_system\":1.7133956386292835,\"usage_user\":2.64797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}]}";
+
+ Assert.assertThrows("Expected AssertionError to throw, but it didn't.",
+ AssertionError.class, () -> {
+ TelegrafData telegrafData = assertTelegrafJSONConvert(oneCpuMetrics);
+ String[] wrongMetricsNames = {"mem_available", "mem_available_percent", "mem_total", "mem_used", "mem_used_percent"};
+ assertConvertToSample(telegrafData, 4, wrongMetricsNames);
+ });
+ }
+
+ @Test
+ public void testInvalidSampleFamilyNames() {
+ String oneMemMetrics = "{\"metrics\":" +
+ "[{\"fields\":" +
+ "{\"available\":6047739904,\"available_percent\":35.41215070500567,\"total\":17078149120,\"used\":11030409216,\"used_percent\":64.58784929499433}," +
+ "\"name\":\"mem\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}]}";
+
+ Assert.assertThrows("Expected AssertionError to throw, but it didn't.",
+ AssertionError.class, () -> {
+ TelegrafData telegrafData = assertTelegrafJSONConvert(oneMemMetrics);
+ String[] wrongMetricsNames = {"cpu_usage_idle", "cpu_usage_irq", "cpu_usage_system", "cpu_usage_user"};
+ assertConvertToSampleFamily(telegrafData, 5, 1, wrongMetricsNames);
+ });
+ }
+
+ @Test
+ public void testWrongSampleNumbers() {
+ String twoCpuMetrics = "{\"metrics\":" +
+ "[{\"fields\":" +
+ "{\"usage_idle\":95.32710280373831,\"usage_irq\":0.3115264797507788,\"usage_system\":1.7133956386292835,\"usage_user\":2.64797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}, " +
+ "{\"fields\":" +
+ "{\"usage_idle\":45.32710280373831,\"usage_irq\":0.4515344797507788,\"usage_system\":3.4533956386292835,\"usage_user\":45.64797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1453365320}]}";
+
+ Assert.assertThrows("Expected AssertionError to throw, but it didn't.",
+ AssertionError.class, () -> {
+ TelegrafData telegrafData = assertTelegrafJSONConvert(twoCpuMetrics);
+ String[] metricsNames = {"cpu_usage_idle", "cpu_usage_irq", "cpu_usage_system", "cpu_usage_user"};
+ assertConvertToSample(telegrafData, 6, metricsNames);
+ });
+ }
+
+ @Test
+ public void testWrongSampleFamilySize() {
+ String threeMemMetrics = "{\"metrics\":" +
+ "[{\"fields\":" +
+ "{\"available\":6047739904,\"available_percent\":35.41215070500567,\"total\":17078149120,\"used\":11030409216,\"used_percent\":64.58784929499433}," +
+ "\"name\":\"mem\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}, " +
+ "{\"fields\":" +
+ "{\"available\":6047563904,\"available_percent\":56.41215070500567,\"total\":27048549120,\"used\":340364409216,\"used_percent\":44.58454929499433}," +
+ "\"name\":\"mem\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663491390}, " +
+ "{\"fields\":" +
+ "{\"available\":5047739904,\"available_percent\":43.41215070500567,\"total\":46078149120,\"used\":45030409216,\"used_percent\":23.58784929499433}," +
+ "\"name\":\"mem\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1453365320}]}";
+
+ Assert.assertThrows("Expected AssertionError to throw, but it didn't.",
+ AssertionError.class, () -> {
+ TelegrafData telegrafData = assertTelegrafJSONConvert(threeMemMetrics);
+ String[] metricsNames = {"mem_available", "mem_available_percent", "mem_total", "mem_used", "mem_used_percent"};
+ assertConvertToSample(telegrafData, 5, metricsNames);
+ assertConvertToSampleFamily(telegrafData, 3, 1, metricsNames);
+ });
+ }
+
+ @Test
+ public void testWrongSampleNumbersOfSampleFamily() {
+ String threeMemMetrics = "{\"metrics\":" +
+ "[{\"fields\":" +
+ "{\"available\":6047739904,\"available_percent\":35.41215070500567,\"total\":17078149120,\"used\":11030409216,\"used_percent\":64.58784929499433}," +
+ "\"name\":\"mem\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}, " +
+ "{\"fields\":" +
+ "{\"available\":6047563904,\"available_percent\":56.41215070500567,\"total\":27048549120,\"used\":340364409216,\"used_percent\":44.58454929499433}," +
+ "\"name\":\"mem\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663491390}, " +
+ "{\"fields\":" +
+ "{\"available\":5047739904,\"available_percent\":43.41215070500567,\"total\":46078149120,\"used\":45030409216,\"used_percent\":23.58784929499433}," +
+ "\"name\":\"mem\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1453365320}]}";
+
+ Assert.assertThrows("Expected AssertionError to throw, but it didn't.",
+ AssertionError.class, () -> {
+ TelegrafData telegrafData = assertTelegrafJSONConvert(threeMemMetrics);
+ String[] metricsNames = {"mem_available", "mem_available_percent", "mem_total", "mem_used", "mem_used_percent"};
+ assertConvertToSample(telegrafData, 5, metricsNames);
+ assertConvertToSampleFamily(telegrafData, 15, 2, metricsNames);
+ });
+ }
+
+ @Test
+ public void testWrongSampleFamilySizeWithSameTimestamp() {
+ String fourCpuMetrics = "{\"metrics\":" +
+ "[{\"fields\":" +
+ "{\"usage_idle\":95.32710280373831,\"usage_irq\":0.3115264797507788,\"usage_system\":1.7133956386292835,\"usage_user\":2.64797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}, " +
+ "{\"fields\":" +
+ "{\"usage_idle\":67.32710280373831,\"usage_irq\":0.5415264797507788,\"usage_system\":1.6533956386292835,\"usage_user\":76.54797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}, " +
+ "{\"fields\":" +
+ "{\"usage_idle\":74.32710280373831,\"usage_irq\":1.2535264797507788,\"usage_system\":4.5633956386292835,\"usage_user\":54.23797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}, " +
+ "{\"fields\":" +
+ "{\"usage_idle\":45.32710280373831,\"usage_irq\":0.4515344797507788,\"usage_system\":3.4533956386292835,\"usage_user\":45.64797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}]}";
+
+ Assert.assertThrows("Expected AssertionError to throw, but it didn't.",
+ AssertionError.class, () -> {
+ TelegrafData telegrafData = assertTelegrafJSONConvert(fourCpuMetrics);
+ String[] metricsNames = {"mem_available", "mem_available_percent", "mem_total", "mem_used", "mem_used_percent"};
+ assertConvertToSample(telegrafData, 5, metricsNames);
+ assertConvertToSampleFamily(telegrafData, 3, 4, metricsNames);
+ });
+ }
+
+ @Test
+ public void testWrongSampleNumbersOfSampleFamilyWithSameTimestamp() {
+ String fourCpuMetrics = "{\"metrics\":" +
+ "[{\"fields\":" +
+ "{\"usage_idle\":95.32710280373831,\"usage_irq\":0.3115264797507788,\"usage_system\":1.7133956386292835,\"usage_user\":2.64797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}, " +
+ "{\"fields\":" +
+ "{\"usage_idle\":67.32710280373831,\"usage_irq\":0.5415264797507788,\"usage_system\":1.6533956386292835,\"usage_user\":76.54797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}, " +
+ "{\"fields\":" +
+ "{\"usage_idle\":74.32710280373831,\"usage_irq\":1.2535264797507788,\"usage_system\":4.5633956386292835,\"usage_user\":54.23797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}, " +
+ "{\"fields\":" +
+ "{\"usage_idle\":45.32710280373831,\"usage_irq\":0.4515344797507788,\"usage_system\":3.4533956386292835,\"usage_user\":45.64797507788162}," +
+ "\"name\":\"cpu\"," +
+ "\"tags\":{\"host\":\"localHost\"}," +
+ "\"timestamp\":1663391390}]}";
+
+ Assert.assertThrows("Expected AssertionError to throw, but it didn't.",
+ AssertionError.class, () -> {
+ TelegrafData telegrafData = assertTelegrafJSONConvert(fourCpuMetrics);
+ String[] metricsNames = {"mem_available", "mem_available_percent", "mem_total", "mem_used", "mem_used_percent"};
+ assertConvertToSample(telegrafData, 5, metricsNames);
+ assertConvertToSampleFamily(telegrafData, 4, 2, metricsNames);
+ });
+ }
+
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/mock/MockModuleManager.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/mock/MockModuleManager.java
new file mode 100644
index 0000000000..6db9734b25
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/mock/MockModuleManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.mock;
+
+import com.google.common.collect.Maps;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleNotFoundRuntimeException;
+import org.apache.skywalking.oap.server.library.module.ModuleProviderHolder;
+
+import java.util.Map;
+
+public abstract class MockModuleManager extends ModuleManager {
+ private final Map<String, ModuleProviderHolder> moduleProviderHolderMap = Maps.newHashMap();
+
+ public MockModuleManager() {
+ init();
+ }
+
+ protected abstract void init();
+
+ protected void register(String name, ModuleProviderHolder provider) {
+ moduleProviderHolderMap.put(name, provider);
+ }
+
+ @Override
+ public boolean has(String moduleName) {
+ return moduleProviderHolderMap.containsKey(moduleName);
+ }
+
+ @Override
+ public ModuleProviderHolder find(String moduleName) throws ModuleNotFoundRuntimeException {
+ if (!moduleProviderHolderMap.containsKey(moduleName)) {
+ throw new ModuleNotFoundRuntimeException("ModuleProviderHolder[" + moduleName + "] cannot found in MOCK.");
+ }
+ return moduleProviderHolderMap.get(moduleName);
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/mock/MockModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/mock/MockModuleProvider.java
new file mode 100644
index 0000000000..5e51194aec
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/mock/MockModuleProvider.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.mock;
+
+import com.google.common.collect.Maps;
+import org.apache.skywalking.oap.server.library.module.ModuleServiceHolder;
+import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+
+import java.util.Map;
+
+public abstract class MockModuleProvider implements ModuleServiceHolder {
+ protected Map<Class<? extends Service>, Service> serviceMap = Maps.newHashMap();
+
+ public MockModuleProvider() {
+ register();
+ }
+
+ protected abstract void register();
+
+ @Override
+ public void registerServiceImplementation(final Class<? extends Service> serviceType,
+ final Service service) throws ServiceNotProvidedException {
+ serviceMap.put(serviceType, service);
+ }
+
+ @Override
+ public <T extends Service> T getService(final Class<T> serviceType) throws ServiceNotProvidedException {
+ return (T) serviceMap.get(serviceType);
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/resources/telegraf-rules/vm.yaml b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/resources/telegraf-rules/vm.yaml
new file mode 100644
index 0000000000..e47c98a7d6
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/resources/telegraf-rules/vm.yaml
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+expSuffix: service(['host'], Layer.OS_LINUX)
+metricPrefix: meter_vm
+metricsRules:
+
+ # cpu
+ - name: cpu_total_percentage
+ exp: cpu_usage_active.tagEqual('cpu', 'cpu-total')
+ - name: cpu_average_used
+ exp: cpu_usage_active.tagNotEqual('cpu', 'cpu-total').avg(['host', 'cpu'])
+ - name: cpu_load1
+ exp: system_load1
+ - name: cpu_load5
+ exp: system_load5
+ - name: cpu_load15
+ exp: system_load15
+
+ # memory
+ - name: memory_total
+ exp: mem_total
+ - name: memory_available
+ exp: mem_available
+ - name: memory_used
+ exp: mem_used
+
+ # swap
+ - name: memory_swap_free
+ exp: mem_swap_free
+ - name: memory_swap_total
+ exp: mem_swap_total
+ - name: memory_swap_percentage
+ exp: 100 - ((mem_swap_free / mem_swap_total) * 100)
+
+ #node filesystem
+ - name: filesystem_percentage
+ exp: disk_used_percent.avg(['host','device'])
+
+ #node disk
+ - name: disk_read
+ exp: diskio_read_bytes.rate('PT1M')
+ - name: disk_written
+ exp: diskio_write_bytes.rate('PT1M')
+
+ #node net
+ - name: network_receive
+ exp: net_bytes_recv.irate()
+ - name: network_transmit
+ exp: net_bytes_sent.irate()
+
+ #node netstat
+ - name: tcp_curr_estab
+ exp: netstat_tcp_established
+ - name: tcp_tw
+ exp: netstat_tcp_time_wait
+ - name: tcp_alloc
+ exp: netstat_tcp_listen
+ - name: udp_inuse
+ exp: netstat_udp_socket
\ No newline at end of file
diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml
index 219b1dadeb..f54ea1b4ec 100644
--- a/oap-server/server-starter/pom.xml
+++ b/oap-server/server-starter/pom.xml
@@ -166,6 +166,11 @@
<artifactId>skywalking-ebpf-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>skywalking-telegraf-receiver-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- receiver module -->
<!-- fetcher module -->
@@ -314,6 +319,7 @@
<exclude>zabbix-rules/</exclude>
<exclude>lal/</exclude>
<exclude>log-mal-rules/</exclude>
+ <exclude>telegraf-rules/</exclude>
</excludes>
</configuration>
</plugin>
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 06e7eb565e..4d7ad03721 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -536,3 +536,8 @@ receiver-event:
receiver-ebpf:
selector: ${SW_RECEIVER_EBPF:default}
default:
+
+receiver-telegraf:
+ selector: ${SW_RECEIVER_TELEGRAF:default}
+ default:
+ activeFiles: ${SW_RECEIVER_TELEGRAF_ACTIVE_FILES:vm}
\ No newline at end of file
diff --git a/oap-server/server-starter/src/main/resources/telegraf-rules/vm.yaml b/oap-server/server-starter/src/main/resources/telegraf-rules/vm.yaml
new file mode 100644
index 0000000000..e47c98a7d6
--- /dev/null
+++ b/oap-server/server-starter/src/main/resources/telegraf-rules/vm.yaml
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+expSuffix: service(['host'], Layer.OS_LINUX)
+metricPrefix: meter_vm
+metricsRules:
+
+ # cpu
+ - name: cpu_total_percentage
+ exp: cpu_usage_active.tagEqual('cpu', 'cpu-total')
+ - name: cpu_average_used
+ exp: cpu_usage_active.tagNotEqual('cpu', 'cpu-total').avg(['host', 'cpu'])
+ - name: cpu_load1
+ exp: system_load1
+ - name: cpu_load5
+ exp: system_load5
+ - name: cpu_load15
+ exp: system_load15
+
+ # memory
+ - name: memory_total
+ exp: mem_total
+ - name: memory_available
+ exp: mem_available
+ - name: memory_used
+ exp: mem_used
+
+ # swap
+ - name: memory_swap_free
+ exp: mem_swap_free
+ - name: memory_swap_total
+ exp: mem_swap_total
+ - name: memory_swap_percentage
+ exp: 100 - ((mem_swap_free / mem_swap_total) * 100)
+
+ #node filesystem
+ - name: filesystem_percentage
+ exp: disk_used_percent.avg(['host','device'])
+
+ #node disk
+ - name: disk_read
+ exp: diskio_read_bytes.rate('PT1M')
+ - name: disk_written
+ exp: diskio_write_bytes.rate('PT1M')
+
+ #node net
+ - name: network_receive
+ exp: net_bytes_recv.irate()
+ - name: network_transmit
+ exp: net_bytes_sent.irate()
+
+ #node netstat
+ - name: tcp_curr_estab
+ exp: netstat_tcp_established
+ - name: tcp_tw
+ exp: netstat_tcp_time_wait
+ - name: tcp_alloc
+ exp: netstat_tcp_listen
+ - name: udp_inuse
+ exp: netstat_udp_socket
\ No newline at end of file
diff --git a/test/e2e-v2/cases/vm/telegraf/docker-compose.yml b/test/e2e-v2/cases/vm/telegraf/docker-compose.yml
new file mode 100644
index 0000000000..654bb1d9a1
--- /dev/null
+++ b/test/e2e-v2/cases/vm/telegraf/docker-compose.yml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.1'
+
+services:
+ oap:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: oap
+ environment:
+ SW_RECEIVER_TELEGRAF: default
+ SW_RECEIVER_TELEGRAF_ACTIVE_FILES: vm
+ ports:
+ - 12800
+
+ telegraf:
+ image: telegraf:1.24
+ networks:
+ - e2e
+ volumes:
+ - ./telegraf.conf:/etc/telegraf/telegraf.conf
+ depends_on:
+ oap:
+ condition: service_healthy
+
+networks:
+ e2e:
\ No newline at end of file
diff --git a/test/e2e-v2/cases/vm/telegraf/e2e.yaml b/test/e2e-v2/cases/vm/telegraf/e2e.yaml
new file mode 100644
index 0000000000..a897c556cc
--- /dev/null
+++ b/test/e2e-v2/cases/vm/telegraf/e2e.yaml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to show how to write configuration files and can be used to test.
+
+setup:
+ env: compose
+ file: docker-compose.yml
+ timeout: 20m
+ init-system-environment: ../../../script/env
+ steps:
+ - name: set PATH
+ command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH
+ - name: install yq
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
+ - name: install swctl
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
+
+verify:
+ retry:
+ count: 20
+ interval: 3s
+ cases:
+ # service list
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql service ls
+ expected: ../expected/service.yml
+ # linear metrics
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_vm_memory_used --service-name=vm-service |yq e 'to_entries' -
+ expected: ../expected/metrics-has-value.yml
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_vm_memory_total --service-name=vm-service |yq e 'to_entries' -
+ expected: ../expected/metrics-has-value.yml
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_vm_memory_available --service-name=vm-service |yq e 'to_entries' -
+ expected: ../expected/metrics-has-value.yml
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_vm_cpu_total_percentage --service-name=vm-service |yq e 'to_entries' -
+ expected: ../expected/metrics-has-value.yml
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_vm_cpu_load1 --service-name=vm-service |yq e 'to_entries' -
+ expected: ../expected/metrics-has-value.yml
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_vm_cpu_load5 --service-name=vm-service |yq e 'to_entries' -
+ expected: ../expected/metrics-has-value.yml
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_vm_cpu_load15 --service-name=vm-service |yq e 'to_entries' -
+ expected: ../expected/metrics-has-value.yml
\ No newline at end of file
diff --git a/test/e2e-v2/cases/vm/telegraf/telegraf.conf b/test/e2e-v2/cases/vm/telegraf/telegraf.conf
new file mode 100644
index 0000000000..0b8295f6f9
--- /dev/null
+++ b/test/e2e-v2/cases/vm/telegraf/telegraf.conf
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Global tags can be specified here in key="value" format.
+[global_tags]
+ # dc = "us-east-1" # will tag all metrics with dc=us-east-1
+ # rack = "1a"
+ ## Environment variables can be used as tags, and throughout the config file
+ # user = "$USER"
+
+
+# Configuration for telegraf agent
+[agent]
+ ## Default data collection interval for all inputs
+ interval = "10s"
+ ## Rounds collection interval to 'interval'
+ ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
+ round_interval = true
+
+ ## Telegraf will send metrics to outputs in batches of at most
+ ## metric_batch_size metrics.
+ ## This controls the size of writes that Telegraf sends to output plugins.
+ metric_batch_size = 1000
+
+ ## Maximum number of unwritten metrics per output. Increasing this value
+ ## allows for longer periods of output downtime without dropping metrics at the
+ ## cost of higher maximum memory usage.
+ metric_buffer_limit = 10000
+
+ ## Collection jitter is used to jitter the collection by a random amount.
+ ## Each plugin will sleep for a random time within jitter before collecting.
+ ## This can be used to avoid many plugins querying things like sysfs at the
+ ## same time, which can have a measurable effect on the system.
+ collection_jitter = "0s"
+
+ ## Collection offset is used to shift the collection by the given amount.
+ ## This can be be used to avoid many plugins querying constraint devices
+ ## at the same time by manually scheduling them in time.
+ # collection_offset = "0s"
+
+ ## Default flushing interval for all outputs. Maximum flush_interval will be
+ ## flush_interval + flush_jitter
+ flush_interval = "3s"
+ ## Jitter the flush interval by a random amount. This is primarily to avoid
+ ## large write spikes for users running a large number of telegraf instances.
+ ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
+ flush_jitter = "0s"
+
+ ## Collected metrics are rounded to the precision specified. Precision is
+ ## specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
+ ## Valid time units are "ns", "us" (or "µs"), "ms", "s".
+ ##
+ ## By default or when set to "0s", precision will be set to the same
+ ## timestamp order as the collection interval, with the maximum being 1s:
+ ## ie, when interval = "10s", precision will be "1s"
+ ## when interval = "250ms", precision will be "1ms"
+ ##
+ ## Precision will NOT be used for service inputs. It is up to each individual
+ ## service input to set the timestamp at the appropriate precision.
+ precision = "0s"
+
+ ## Override default hostname, if empty use os.Hostname()
+ hostname = "vm-service"
+ ## If set to true, do no set the "host" tag in the telegraf agent.
+ omit_hostname = false
+
+
+###############################################################################
+# OUTPUT PLUGINS #
+###############################################################################
+
+
+# # A plugin that can transmit metrics over HTTP
+[[outputs.http]]
+# ## URL is the address to send metrics to
+ url = "http://oap:12800/telegraf"
+#
+# ## Timeout for HTTP message
+ timeout = "10s"
+#
+# ## HTTP method, one of: "POST" or "PUT"
+ method = "POST"
+#
+# ## Data format to output.
+# ## Each data format has it's own unique set of configuration options, read
+# ## more about them here:
+# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
+ data_format = "json"
+
+
+###############################################################################
+# INPUT PLUGINS #
+###############################################################################
+
+[[outputs.file]]
+ ## Files to write to, "stdout" is a specially handled file.
+ files = ["stdout", "/tmp/metrics.out"]
+
+# Read metrics about cpu usage
+[[inputs.cpu]]
+ ## Whether to report per-cpu stats or not
+ percpu = true
+ ## Whether to report total system cpu stats or not
+ totalcpu = true
+ ## If true, collect raw CPU time metrics
+ collect_cpu_time = false
+ ## If true, compute and report the sum of all non-idle CPU states
+ report_active = true
+
+
+# Read metrics about memory usage
+[[inputs.mem]]
+ # no configuration
+
+
+[[inputs.system]]
+ # no configuration
+
+[[inputs.disk]]
+ # no configuration
+
+[[inputs.diskio]]
+ # no configuration
+
+[[inputs.net]]
+ ## By default, telegraf gathers stats from any up interface (excluding loopback)
+ ## Setting interfaces will tell it to gather these explicit interfaces,
+ ## regardless of status. When specifying an interface, glob-style
+ ## patterns are also supported.
+ ##
+ # interfaces = ["eth*", "enp0s[0-1]", "lo"]
+ ##
+ ## On linux systems telegraf also collects protocol stats.
+ ## Setting ignore_protocol_stats to true will skip reporting of protocol metrics.
+ ##
+ # ignore_protocol_stats = false
+ ##
+
+[[inputs.netstat]]
\ No newline at end of file