You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2022/11/05 00:08:35 UTC

[skywalking] branch master updated: Support the telegraf receiver plugin module (#9620)

This is an automated email from the ASF dual-hosted git repository.

liuhaoyangzz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 42f3396b64 Support the telegraf receiver plugin module (#9620)
42f3396b64 is described below

commit 42f3396b647f331a29f7a4deef427ba2fbbd284d
Author: soander <Wa...@outlook.com>
AuthorDate: Fri Nov 4 17:08:20 2022 -0700

    Support the telegraf receiver plugin module (#9620)
    
    * The telegraf receiver plugin module development.
    
    * Refactored code of converting telegraf data and fixed some errors.
    
    * Support Telegraf receiver plugin module.
    
    * Add telegraf-receiver.md, update changes.md and vm-monitoring.md.
    
    * Rename YAML file and change code style.
    
    * Change receiver-telegraf of application.yml.
    
    * The receiver-telegraf e2e test.
    
    * The telegraf receiver e2e test.
    
    * Fix an issue and delete redundant configs.
    
    * Add Unit Test about converting Telegraf metrics.
    
    * Adjust Unit Test about converting Telegraf metrics.
    
    * Add License Header to Unit Test and change binary.xml.
    
    * Change module provider's config initialization mechanism.
    
    * Exclude the telegraf-rules in server-starter pom.xml.
    
    * Fix telegraf e2e test issues.
    
    * Change telegraf e2e test.
    
    * Fix issues
    
    * Fix issues.
    
    * Add Sample convert Unit Test.
    
    * Change vm.yaml, related documents and fix some issues.
    
    * Change backend-vm-monitoring.md.
    
    * Fix SampleConvertTest checkstyle issue.
    
    * Change vm.yaml swap MAL.
    
    * Update menu.yml and binary.xml.
    
    * Update Telegraf Unit test.
    
    * Delete telegraf config package, use meter.analyzer.prometheus package to load config file.
    
    * Reorder telegraf metrics in menu.yml.
    
    * Change e2e, vm, config, linux-service and vm.md.
    
    * Update telegraf.conf file.
    
    * Change url of telegraf.conf file.
    
    * Update e2e.yaml, conf file, menu.yml and delete useless code of provider.
    
    * Update grouping sampleFamily by timestamp and name, and add new UTs.
    
    * Update vm-monitoring.md and .asf.yaml.
    
    Co-authored-by: Superskyyy (ONLINE) <Su...@outlook.com>
    Co-authored-by: 吴晟 Wu Sheng <wu...@foxmail.com>
---
 .asf.yaml                                          |   1 +
 .github/workflows/skywalking.yaml                  |   2 +
 apm-dist/src/main/assembly/binary.xml              |   1 +
 docs/en/setup/backend/backend-vm-monitoring.md     |  48 +-
 docs/en/setup/backend/telegraf-receiver.md         |  45 ++
 docs/menu.yml                                      |   2 +
 oap-server/server-receiver-plugin/pom.xml          |   1 +
 .../skywalking-telegraf-receiver-plugin/pom.xml    |  44 ++
 .../telegraf/module/TelegrafReceiverModule.java    |  33 ++
 .../telegraf/provider/TelegrafModuleConfig.java    |  37 ++
 .../provider/TelegrafReceiverProvider.java         | 106 +++++
 .../provider/handler/TelegrafServiceHandler.java   | 140 ++++++
 .../provider/handler/pojo/TelegrafData.java        |  52 +++
 .../provider/handler/pojo/TelegrafDatum.java       |  37 ++
 ...ywalking.oap.server.library.module.ModuleDefine |  19 +
 ...alking.oap.server.library.module.ModuleProvider |  19 +
 .../receiver/telegraf/TelegrafMetricsTest.java     | 490 +++++++++++++++++++++
 .../receiver/telegraf/mock/MockModuleManager.java  |  53 +++
 .../receiver/telegraf/mock/MockModuleProvider.java |  47 ++
 .../src/test/resources/telegraf-rules/vm.yaml      |  72 +++
 oap-server/server-starter/pom.xml                  |   6 +
 .../src/main/resources/application.yml             |   5 +
 .../src/main/resources/telegraf-rules/vm.yaml      |  72 +++
 test/e2e-v2/cases/vm/telegraf/docker-compose.yml   |  40 ++
 test/e2e-v2/cases/vm/telegraf/e2e.yaml             |  53 +++
 test/e2e-v2/cases/vm/telegraf/telegraf.conf        | 152 +++++++
 26 files changed, 1561 insertions(+), 16 deletions(-)

diff --git a/.asf.yaml b/.asf.yaml
index 275f60485f..3820cd5524 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -33,6 +33,7 @@ github:
     - open-telemetry
     - zabbix
     - ebpf
+    - telegraf
   enabled_merge_buttons:
     squash:  true
     merge:   false
diff --git a/.github/workflows/skywalking.yaml b/.github/workflows/skywalking.yaml
index 3bd49516a8..6341f7260b 100644
--- a/.github/workflows/skywalking.yaml
+++ b/.github/workflows/skywalking.yaml
@@ -549,6 +549,8 @@ jobs:
             config: test/e2e-v2/cases/vm/zabbix/e2e.yaml
           - name: VM Prometheus
             config: test/e2e-v2/cases/vm/prometheus-node-exporter/e2e.yaml
+          - name: VM Telegraf
+            config: test/e2e-v2/cases/vm/telegraf/e2e.yaml
           - name: So11y
             config: test/e2e-v2/cases/so11y/e2e.yaml
           - name: MySQL Prometheus and slowsql
diff --git a/apm-dist/src/main/assembly/binary.xml b/apm-dist/src/main/assembly/binary.xml
index cef37bf39f..259f721272 100644
--- a/apm-dist/src/main/assembly/binary.xml
+++ b/apm-dist/src/main/assembly/binary.xml
@@ -69,6 +69,7 @@
                 <include>ui-initialized-templates/*/*.json</include>
                 <include>lal/*</include>
                 <include>log-mal-rules/*</include>
+                <include>telegraf-rules/*</include>
             </includes>
             <outputDirectory>config</outputDirectory>
         </fileSet>
diff --git a/docs/en/setup/backend/backend-vm-monitoring.md b/docs/en/setup/backend/backend-vm-monitoring.md
index 45589b3588..9edf8d1413 100644
--- a/docs/en/setup/backend/backend-vm-monitoring.md
+++ b/docs/en/setup/backend/backend-vm-monitoring.md
@@ -3,38 +3,54 @@ SkyWalking leverages Prometheus node-exporter to collect metrics data from the V
 [OpenTelemetry receiver](opentelemetry-receiver.md) and into the [Meter System](./../../concepts-and-designs/meter.md).
 VM entity as a `Service` in OAP and on the `Layer: OS_LINUX`.
 
+SkyWalking also provides InfluxDB Telegraf to receive VMs' metrics data by [Telegraf receiver](./telegraf-receiver.md).
+The telegraf receiver plugin receiver, process and convert the metrics, then it send converted metrics to [Meter System](./../../concepts-and-designs/meter.md).
+VM entity as a `Service` in OAP and on the `Layer: OS_LINUX`.
+
 ## Data flow
+**For OpenTelemetry receiver:**
 1. The Prometheus node-exporter collects metrics data from the VMs.
 2. The OpenTelemetry Collector fetches metrics from node-exporter via Prometheus Receiver and pushes metrics to the SkyWalking OAP Server via the OpenCensus gRPC Exporter or OpenTelemetry gRPC exporter.
 3. The SkyWalking OAP Server parses the expression with [MAL](../../concepts-and-designs/mal.md) to filter/calculate/aggregate and store the results.
 
+**For Telegraf receiver:**
+1. The InfluxDB Telegraf [input plugins](https://docs.influxdata.com/telegraf/v1.24/plugins/) collects various metrics data from the VMs.
+2. The cpu, mem, system, disk and diskio input plugins should be set in telegraf.conf file.
+2. The InfluxDB Telegraf send `JSON` format metrics by `HTTP` messages to Telegraf Receiver, then pushes converted metrics to the SkyWalking OAP Server [Meter System](./../../concepts-and-designs/meter.md).
+3. The SkyWalking OAP Server parses the expression with [MAL](../../concepts-and-designs/mal.md) to filter/calculate/aggregate ad store the results.
+4. The meter_vm_cpu_average_used metrics indicates the average usage of each CPU core for telegraf receiver.
 
 ## Setup
-
+**For OpenTelemetry receiver:**
 1. Setup [Prometheus node-exporter](https://prometheus.io/docs/guides/node-exporter/).
 2. Setup [OpenTelemetry Collector ](https://opentelemetry.io/docs/collector/). This is an example for OpenTelemetry Collector configuration [otel-collector-config.yaml](../../../../test/e2e-v2/cases/vm/prometheus-node-exporter/otel-collector-config.yaml).
 3. Config SkyWalking [OpenTelemetry receiver](opentelemetry-receiver.md).
 
+**For Telegraf receiver:**
+1. Setup InfluxDB Telegraf's `telegraf.conf file` according to [Telegraf office document](https://docs.influxdata.com/telegraf/v1.24/).
+2. Setup InfluxDB Telegraf's `telegraf.conf file` specific rules according to [Telegraf receiver document](telegraf-receiver.md).
+3. Config SkyWalking [Telegraf receiver](telegraf-receiver.md).
+
 ## Supported Metrics
 
-| Monitoring Panel | Unit | Metric Name | Description | Data Source |
-|-----|-----|-----|-----|-----|
-| CPU Usage | % | cpu_total_percentage | The total percentage usage of the CPU core. If there are 2 cores, the maximum usage is 200%. | Prometheus node-exporter |
-| Memory RAM Usage | MB | meter_vm_memory_used | The total RAM usage | Prometheus node-exporter |
-| Memory Swap Usage | % | meter_vm_memory_swap_percentage | The percentage usage of swap memory | Prometheus node-exporter |
-| CPU Average Used | % | meter_vm_cpu_average_used | The percentage usage of the CPU core in each mode | Prometheus node-exporter |
-| CPU Load |  | meter_vm_cpu_load1<br />meter_vm_cpu_load5<br />meter_vm_cpu_load15 | The CPU 1m / 5m / 15m average load | Prometheus node-exporter |
-| Memory RAM | MB | meter_vm_memory_total<br />meter_vm_memory_available<br />meter_vm_memory_used | The RAM statistics, including Total / Available / Used | Prometheus node-exporter |
-| Memory Swap | MB | meter_vm_memory_swap_free<br />meter_vm_memory_swap_total | Swap memory statistics, including Free / Total | Prometheus node-exporter |
-| File System Mountpoint Usage | % | meter_vm_filesystem_percentage | The percentage usage of the file system at each mount point | Prometheus node-exporter |
-| Disk R/W | KB/s | meter_vm_disk_read,meter_vm_disk_written | The disk read and written | Prometheus node-exporter |
-| Network Bandwidth Usage | KB/s | meter_vm_network_receive<br />meter_vm_network_transmit | The network receive and transmit | Prometheus node-exporter |
-| Network Status |  | meter_vm_tcp_curr_estab<br />meter_vm_tcp_tw<br />meter_vm_tcp_alloc<br />meter_vm_sockets_used<br />meter_vm_udp_inuse | The number of TCPs established / TCP time wait / TCPs allocated / sockets in use / UDPs in use | Prometheus node-exporter |
-| Filefd Allocated |  | meter_vm_filefd_allocated | The number of file descriptors allocated | Prometheus node-exporter |
+| Monitoring Panel             | Unit | Metric Name                                                                                                             | Description                                                                                    | Data Source                                         |
+|------------------------------|------|-------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------|-----------------------------------------------------|
+| CPU Usage                    | %    | meter_vm_cpu_total_percentage                                                                                           | The total percentage usage of the CPU core. If there are 2 cores, the maximum usage is 200%.   | Prometheus node-exporter<br />Telegraf input plugin |
+| Memory RAM Usage             | MB   | meter_vm_memory_used                                                                                                    | The total RAM usage                                                                            | Prometheus node-exporter<br />Telegraf input plugin |
+| Memory Swap Usage            | %    | meter_vm_memory_swap_percentage                                                                                         | The percentage usage of swap memory                                                            | Prometheus node-exporter<br />Telegraf input plugin |
+| CPU Average Used             | %    | meter_vm_cpu_average_used                                                                                               | The percentage usage of the CPU core in each mode                                              | Prometheus node-exporter<br />Telegraf input plugin |
+| CPU Load                     |      | meter_vm_cpu_load1<br />meter_vm_cpu_load5<br />meter_vm_cpu_load15                                                     | The CPU 1m / 5m / 15m average load                                                             | Prometheus node-exporter<br />Telegraf input plugin |
+| Memory RAM                   | MB   | meter_vm_memory_total<br />meter_vm_memory_available<br />meter_vm_memory_used                                          | The RAM statistics, including Total / Available / Used                                         | Prometheus node-exporter<br />Telegraf input plugin |
+| Memory Swap                  | MB   | meter_vm_memory_swap_free<br />meter_vm_memory_swap_total                                                               | Swap memory statistics, including Free / Total                                                 | Prometheus node-exporter<br />Telegraf input plugin |
+| File System Mountpoint Usage | %    | meter_vm_filesystem_percentage                                                                                          | The percentage usage of the file system at each mount point                                    | Prometheus node-exporter<br />Telegraf input plugin |
+| Disk R/W                     | KB/s | meter_vm_disk_read,meter_vm_disk_written                                                                                | The disk read and written                                                                      | Prometheus node-exporter<br />Telegraf input plugin |
+| Network Bandwidth Usage      | KB/s | meter_vm_network_receive<br />meter_vm_network_transmit                                                                 | The network receive and transmit                                                               | Prometheus node-exporter<br />Telegraf input plugin |
+| Network Status               |      | meter_vm_tcp_curr_estab<br />meter_vm_tcp_tw<br />meter_vm_tcp_alloc<br />meter_vm_sockets_used<br />meter_vm_udp_inuse | The number of TCPs established / TCP time wait / TCPs allocated / sockets in use / UDPs in use | Prometheus node-exporter<br />Telegraf input plugin |
+| Filefd Allocated             |      | meter_vm_filefd_allocated                                                                                               | The number of file descriptors allocated                                                       | Prometheus node-exporter                            |
 
 ## Customizing
 You can customize your own metrics/expression/dashboard panel.
-The metrics definition and expression rules are found in `/config/otel-rules/vm.yaml`.
+The metrics definition and expression rules are found in `/config/otel-rules/vm.yaml` and `/config/telegraf-rules/vm.yaml`.
 The dashboard panel confirmations are found in `/config/ui-initialized-templates/os_linux`.
 
 ## Blog
diff --git a/docs/en/setup/backend/telegraf-receiver.md b/docs/en/setup/backend/telegraf-receiver.md
new file mode 100644
index 0000000000..9512ee8704
--- /dev/null
+++ b/docs/en/setup/backend/telegraf-receiver.md
@@ -0,0 +1,45 @@
+# Telegraf receiver
+
+The Telegraf receiver supports receiving InfluxDB Telegraf's metrics by meter-system. 
+The OAP can load the configuration at bootstrap. The files are located at `$CLASSPATH/telegraf-rules`.
+If the new configuration is not well-formed, the OAP may fail to start up.
+
+This is the [InfluxDB Telegraf](https://docs.influxdata.com/telegraf/v1.24/) Document, 
+the Telegraf receiver can handle Telegraf's [CPU Input Plugin](https://github.com/influxdata/telegraf/blob/release-1.24/plugins/inputs/cpu/README.md), 
+[Memory Input Plugin](https://github.com/influxdata/telegraf/blob/release-1.24/plugins/inputs/mem/README.md).
+
+There are many other telegraf input plugins, users can customize different input plugins' rule files.
+The rule file should be in YAML format, defined by the scheme described in [MAL](../../concepts-and-designs/mal.md).
+Please see the [telegraf plugin directory](https://docs.influxdata.com/telegraf/v1.24/plugins/) for more input plugins information.
+
+**Notice:**
+* The Telegraf receiver module uses `HTTP` to receive telegraf's metrics, 
+so the outputs method should be set `[[outputs.http]]` in telegraf.conf file.
+Please see the [http outputs](https://github.com/influxdata/telegraf/blob/release-1.24/plugins/outputs/http/README.md)
+for more details.
+
+* The Telegraf receiver module **only** process telegraf's `JSON` metrics format,
+the data format should be set `data_format = "json"` in telegraf.conf file.
+Please see the [JSON data format](https://docs.influxdata.com/telegraf/v1.24/data_formats/output/json/)
+for more details.
+
+* The default `json_timestamp_units` is second in JSON output, 
+and the Telegraf receiver module **only** process `second` timestamp unit.
+If users configure `json_timestamp_units` in telegraf.conf file, `json_timestamp_units = "1s"` is feasible.
+Please see the [JSON data format](https://docs.influxdata.com/telegraf/v1.24/data_formats/output/json/)
+for more details.
+
+The following is the default telegraf receiver YAML rule file in the `application.yml`,
+Set `SW_RECEIVER_TELEGRAF:default` through system environment or change `SW_RECEIVER_TELEGRAF_ACTIVE_FILES:vm`
+to activate the OpenTelemetry receiver with `vm.yml` in telegraf-rules.
+```yaml
+receiver-telegraf:
+  selector: ${SW_RECEIVER_TELEGRAF:default}
+  default:
+    activeFiles: ${SW_RECEIVER_TELEGRAF_ACTIVE_FILES:vm}
+```
+
+| Rule Name | Description    | Configuration File     | Data Source                                                             |
+|-----------|----------------|------------------------|-------------------------------------------------------------------------|
+| vm        | Metrics of VMs | telegraf-rules/vm.yaml | Telegraf inputs plugins --> Telegraf Receiver --> SkyWalking OAP Server |
+
diff --git a/docs/menu.yml b/docs/menu.yml
index 220fba39bd..2f85c091c4 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -119,6 +119,8 @@ catalog:
             path: "/en/setup/backend/backend-zabbix"
           - name: "Meter Analysis"
             path: "/en/setup/backend/backend-meter"
+          - name: "Telegraf Metrics"
+            path: "/en/setup/backend/telegraf-receiver"
           - name: "Apdex Threshold"
             path: "/en/setup/backend/apdex-threshold"
           - name: "Spring Sleuth Metrics Analysis"
diff --git a/oap-server/server-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/pom.xml
index 7255194f08..1a723f6a86 100644
--- a/oap-server/server-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/pom.xml
@@ -46,6 +46,7 @@
         <module>skywalking-event-receiver-plugin</module>
         <module>skywalking-zabbix-receiver-plugin</module>
         <module>skywalking-ebpf-receiver-plugin</module>
+        <module>skywalking-telegraf-receiver-plugin</module>
     </modules>
 
     <dependencies>
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/pom.xml
new file mode 100644
index 0000000000..b1a6144c1a
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>server-receiver-plugin</artifactId>
+        <groupId>org.apache.skywalking</groupId>
+        <version>9.3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>skywalking-telegraf-receiver-plugin</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>agent-analyzer</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>skywalking-sharing-server-plugin</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/module/TelegrafReceiverModule.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/module/TelegrafReceiverModule.java
new file mode 100644
index 0000000000..8e46f74d03
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/module/TelegrafReceiverModule.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.module;
+
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+
+public class TelegrafReceiverModule extends ModuleDefine {
+
+    public TelegrafReceiverModule() {
+        super("receiver-telegraf");
+    }
+
+    @Override
+    public Class[] services() {
+        return new Class[0];
+    }
+}
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/TelegrafModuleConfig.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/TelegrafModuleConfig.java
new file mode 100644
index 0000000000..40c40a8e3a
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/TelegrafModuleConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.provider;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+
+@Setter
+@Getter
+public class TelegrafModuleConfig extends ModuleConfig {
+
+    public static final String CONFIG_PATH = "telegraf-rules";
+
+    /**
+     * active receive configs, files split by ","
+     */
+    private String activeFiles = Const.EMPTY_STRING;
+
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/TelegrafReceiverProvider.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/TelegrafReceiverProvider.java
new file mode 100644
index 0000000000..871294afa9
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/TelegrafReceiverProvider.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.provider;
+
+import com.google.common.base.Splitter;
+import com.linecorp.armeria.common.HttpMethod;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rules;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
+import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
+import org.apache.skywalking.oap.server.receiver.telegraf.module.TelegrafReceiverModule;
+import org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.TelegrafServiceHandler;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class TelegrafReceiverProvider extends ModuleProvider {
+    private List<Rule> configs;
+    private TelegrafModuleConfig moduleConfig;
+
+    @Override
+    public String name() {
+        return "default";
+    }
+
+    @Override
+    public Class<? extends ModuleDefine> module() {
+        return TelegrafReceiverModule.class;
+    }
+
+    @Override
+    public ConfigCreator newConfigCreator() {
+        return new ConfigCreator<TelegrafModuleConfig>() {
+            @Override
+            public Class type() {
+                return TelegrafModuleConfig.class;
+            }
+
+            @Override
+            public void onInitialized(final TelegrafModuleConfig initialized) {
+                moduleConfig = initialized;
+            }
+        };
+    }
+
+    @Override
+    public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+        try {
+            configs = Rules.loadRules(TelegrafModuleConfig.CONFIG_PATH,
+                    StringUtil.isEmpty(moduleConfig.getActiveFiles()) ? Collections.emptyList() : Splitter.on(",").splitToList(moduleConfig.getActiveFiles()));
+        } catch (IOException e) {
+            throw new ModuleStartException("Failed to load MAL rules", e);
+        }
+    }
+
+    @Override
+    public void start() throws ServiceNotProvidedException, ModuleStartException {
+        if (CollectionUtils.isNotEmpty(configs)) {
+            HTTPHandlerRegister httpHandlerRegister = getManager().find(SharingServerModule.NAME)
+                    .provider()
+                    .getService(HTTPHandlerRegister.class);
+            MeterSystem meterSystem = getManager().find(CoreModule.NAME)
+                    .provider()
+                    .getService(MeterSystem.class);
+            httpHandlerRegister.addHandler(new TelegrafServiceHandler(getManager(), meterSystem, configs),
+                    Collections.singletonList(HttpMethod.POST));
+        }
+    }
+
+    @Override
+    public void notifyAfterCompleted() throws ServiceNotProvidedException {
+    }
+
+    @Override
+    public String[] requiredModules() {
+        return new String[] {
+            CoreModule.NAME,
+            SharingServerModule.NAME
+        };
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/TelegrafServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/TelegrafServiceHandler.java
new file mode 100644
index 0000000000..cd4dde4170
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/TelegrafServiceHandler.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.provider.handler;
+
+import com.google.common.collect.ImmutableMap;
+import com.linecorp.armeria.server.annotation.Post;
+import com.linecorp.armeria.server.annotation.RequestConverter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.common.v3.Commands;
+import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
+import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
+import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
+import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamilyBuilder;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.pojo.TelegrafData;
+import org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.pojo.TelegrafDatum;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class TelegrafServiceHandler {
+
+    private final HistogramMetrics histogram;
+    private final CounterMetrics errorCounter;
+    private List<MetricConvert> metricConvert;
+
+    public TelegrafServiceHandler(ModuleManager moduleManager, MeterSystem meterSystem, List<Rule> rules) {
+
+        this.metricConvert = rules.stream().map(r -> new MetricConvert(r, meterSystem)).collect(Collectors.toList());
+
+        final MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
+                                                           .provider()
+                                                           .getService(MetricsCreator.class);
+
+        histogram = metricsCreator.createHistogramMetric(
+                "telegraf_in_latency", "The process latency of telegraf data",
+                new MetricsTag.Keys("protocol"), new MetricsTag.Values("http")
+        );
+
+        errorCounter = metricsCreator.createCounter(
+                "telegraf_error_count", "The error number of telegraf analysis",
+                new MetricsTag.Keys("protocol"), new MetricsTag.Values("http")
+        );
+    }
+
+    /**
+    * Convert the TelegrafData object to meter {@link Sample}
+    **/
+    public List<Sample> convertTelegraf(TelegrafDatum telegrafData) {
+
+        List<Sample> sampleList = new ArrayList<>();
+
+        Map<String, Object> fields = telegrafData.getFields();
+        String name = telegrafData.getName();
+        Map<String, String> tags = telegrafData.getTags();
+        ImmutableMap<String, String> immutableTags = ImmutableMap.copyOf(tags);
+        long timestamp = telegrafData.getTimestamp();
+
+        fields.forEach((key, value) -> {
+            if (value instanceof Number) {
+                Sample.SampleBuilder builder = Sample.builder();
+                Sample sample = builder.name(name + "_" + key)
+                        .timestamp(timestamp * 1000L)
+                        .value(((Number) value).doubleValue())
+                        .labels(immutableTags).build();
+
+                sampleList.add(sample);
+            }
+        });
+        return sampleList;
+
+    }
+
+    public List<ImmutableMap<String, SampleFamily>> convertSampleFamily(TelegrafData telegrafData) {
+        List<Sample> allSamples = new ArrayList<>();
+
+        List<TelegrafDatum> metrics = telegrafData.getMetrics();
+        for (TelegrafDatum m : metrics) {
+            List<Sample> samples = convertTelegraf(m);
+            allSamples.addAll(samples);
+        }
+
+        List<ImmutableMap<String, SampleFamily>> res = new ArrayList<>();
+
+        // Grouping all samples by timestamp name
+        Map<Long, List<Sample>> sampleFamilyByTime = allSamples.stream()
+                .collect(Collectors.groupingBy(Sample::getTimestamp));
+
+        // Grouping all samples with the same timestamp by name
+        for (List<Sample> s : sampleFamilyByTime.values()) {
+            ImmutableMap.Builder<String, SampleFamily> builder = ImmutableMap.builder();
+            Map<String, List<Sample>> sampleFamilyByName = s.stream()
+                    .collect(Collectors.groupingBy(Sample::getName));
+            sampleFamilyByName.forEach((k, v) ->
+                    builder.put(k, SampleFamilyBuilder.newBuilder(v.toArray(new Sample[0])).build()));
+            res.add(builder.build());
+        }
+
+        return res;
+    }
+
+    @Post("/telegraf")
+    @RequestConverter(TelegrafData.class)
+    public Commands collectData(TelegrafData telegrafData) {
+        try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
+            List<ImmutableMap<String, SampleFamily>> sampleFamily = convertSampleFamily(telegrafData);
+            sampleFamily.forEach(s -> metricConvert.forEach(m -> m.toMeter(s)));
+        } catch (Exception e) {
+            errorCounter.inc();
+            log.error(e.getMessage(), e);
+        }
+        return Commands.newBuilder().build();
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/pojo/TelegrafData.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/pojo/TelegrafData.java
new file mode 100644
index 0000000000..01db226aa6
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/pojo/TelegrafData.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.pojo;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.linecorp.armeria.common.AggregatedHttpRequest;
+import com.linecorp.armeria.common.annotation.Nullable;
+import com.linecorp.armeria.server.ServiceRequestContext;
+import com.linecorp.armeria.server.annotation.RequestConverterFunction;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.lang.reflect.ParameterizedType;
+import java.util.List;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TelegrafData implements RequestConverterFunction {
+    private List<TelegrafDatum> metrics;
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    @Override
+    public @Nullable Object convertRequest(ServiceRequestContext ctx, AggregatedHttpRequest request, Class<?> expectedResultType,
+                                           @Nullable ParameterizedType expectedParameterizedResultType) throws Exception {
+
+        if (expectedResultType == TelegrafData.class) {
+            // Convert the request to a TelegrafData object
+            return MAPPER.readValue(request.contentUtf8(), TelegrafData.class);
+        }
+        return RequestConverterFunction.fallthrough();
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/pojo/TelegrafDatum.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/pojo/TelegrafDatum.java
new file mode 100644
index 0000000000..7a53f4e7d7
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/telegraf/provider/handler/pojo/TelegrafDatum.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.pojo;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TelegrafDatum {
+    private Map<String, Object> fields;
+    private String name;
+    private Map<String, String> tags;
+    private long timestamp;
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine
new file mode 100644
index 0000000000..1aa67acc02
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.receiver.telegraf.module.TelegrafReceiverModule
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100644
index 0000000000..4fb8e36621
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.receiver.telegraf.provider.TelegrafReceiverProvider
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/TelegrafMetricsTest.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/TelegrafMetricsTest.java
new file mode 100644
index 0000000000..98c8132409
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/TelegrafMetricsTest.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
+import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rules;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.CoreModuleProvider;
+import org.apache.skywalking.oap.server.core.analysis.StreamDefinition;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
+import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.config.NamingControl;
+import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.receiver.telegraf.mock.MockModuleManager;
+import org.apache.skywalking.oap.server.receiver.telegraf.mock.MockModuleProvider;
+import org.apache.skywalking.oap.server.receiver.telegraf.provider.TelegrafModuleConfig;
+import org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.TelegrafServiceHandler;
+import org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.pojo.TelegrafData;
+import org.apache.skywalking.oap.server.receiver.telegraf.provider.handler.pojo.TelegrafDatum;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.powermock.reflect.Whitebox;
+import org.testcontainers.shaded.com.fasterxml.jackson.core.JsonParseException;
+import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class TelegrafMetricsTest {
+
+    protected CoreModuleProvider moduleProvider;
+    protected ModuleManager moduleManager;
+    protected MeterSystem meterSystem;
+    protected TelegrafServiceHandler telegrafServiceHandler;
+
+    private List<AcceptableValue> values = new ArrayList<>();
+
+    @BeforeClass
+    public static void setup() {
+        MeterEntity.setNamingControl(
+                new NamingControl(512, 512, 512, new EndpointNameGrouping()));
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        MeterEntity.setNamingControl(null);
+    }
+
+    @Before
+    public void setupMetrics() throws Throwable {
+        moduleProvider = Mockito.mock(CoreModuleProvider.class);
+        moduleManager = new MockModuleManager() {
+            @Override
+            protected void init() {
+            register(CoreModule.NAME, () -> new MockModuleProvider() {
+                @Override
+                protected void register() {
+                    registerServiceImplementation(NamingControl.class, new NamingControl(
+                            512, 512, 512, new EndpointNameGrouping()));
+                }
+            });
+            register(TelemetryModule.NAME, () -> new MockModuleProvider() {
+                @Override
+                protected void register() {
+                    registerServiceImplementation(MetricsCreator.class, new MetricsCreatorNoop());
+                }
+            });
+            }
+        };
+
+        // prepare the context
+        meterSystem = Mockito.mock(MeterSystem.class);
+        Whitebox.setInternalState(MetricsStreamProcessor.class, "PROCESSOR",
+                Mockito.spy(MetricsStreamProcessor.getInstance()));
+        doNothing().when(MetricsStreamProcessor.getInstance()).create(any(), (StreamDefinition) any(), any());
+        CoreModule coreModule = Mockito.spy(CoreModule.class);
+
+        Whitebox.setInternalState(coreModule, "loadedProvider", moduleProvider);
+        when(moduleProvider.getService(MeterSystem.class)).thenReturn(meterSystem);
+
+        telegrafServiceHandler = buildTelegrafServiceHandler();
+    }
+
+    protected TelegrafServiceHandler buildTelegrafServiceHandler() throws Exception {
+        // Notifies meter system received metric
+        doAnswer(invocationOnMock -> {
+            values.add(invocationOnMock.getArgument(0, AcceptableValue.class));
+            return null;
+        }).when(meterSystem).doStreamingCalculation(any());
+
+        // load context
+        List<Rule> telegrafConfigs = Rules.loadRules(TelegrafModuleConfig.CONFIG_PATH, Arrays.asList("vm"));
+        return new TelegrafServiceHandler(moduleManager, meterSystem, telegrafConfigs);
+    }
+
+    private TelegrafData assertTelegrafJSONConvert(String jsonMessage) throws Exception {
+        Assert.assertNotNull(jsonMessage);
+        ObjectMapper mapper = new ObjectMapper();
+        TelegrafData telegrafData = mapper.readValue(jsonMessage, TelegrafData.class);
+        Assert.assertNotNull(telegrafData);
+        return telegrafData;
+    }
+
+    private void assertSample(Sample sample, String... name) {
+        Assert.assertNotNull(sample);
+        Assert.assertTrue(Arrays.asList(name).contains(sample.getName()));
+        Assert.assertEquals(sample.getLabels(), ImmutableMap.copyOf(Collections.singletonMap("host", "localHost")));
+        Assert.assertTrue(sample.getValue() >= 0);
+        Assert.assertTrue(sample.getTimestamp() >= 0);
+    }
+
+    private void assertConvertToSample(TelegrafData telegrafData, int totalSamplesPerMetrics, String... name) {
+        Assert.assertNotNull(telegrafData);
+        List<TelegrafDatum> metrics = telegrafData.getMetrics();
+        Assert.assertNotNull(metrics);
+        for (TelegrafDatum t : metrics) {
+            boolean convert = false;
+            List<Sample> samples = telegrafServiceHandler.convertTelegraf(t);
+            Assert.assertEquals(samples.size(), totalSamplesPerMetrics);
+            for (Sample s : samples) {
+                assertSample(s, name);
+                convert = true;
+            }
+            if (!convert) {
+                // Throw exception when key not found
+                throw new AssertionError("Failed to convert json telegraf message to Sample.");
+            }
+        }
+    }
+
+    private void assertConvertToSampleFamily(TelegrafData telegrafData, int sampleFamilySize, int samplePerSampleFamily, String... name) {
+        Assert.assertNotNull(telegrafData);
+        List<ImmutableMap<String, SampleFamily>> sampleFamilyCollections = telegrafServiceHandler.convertSampleFamily(telegrafData);
+        Assert.assertNotNull(sampleFamilyCollections);
+        int actualSize = 0;
+        for (ImmutableMap<String, SampleFamily> samples : sampleFamilyCollections) {
+            samples.forEach((k, v) -> Assert.assertEquals(v.samples.length, samplePerSampleFamily));
+            actualSize += samples.size();
+        }
+        Assert.assertEquals(actualSize, sampleFamilySize);
+        sampleFamilyCollections.forEach(sampleFamily -> {
+            for (Map.Entry<String, SampleFamily> entry : sampleFamily.entrySet()) {
+                String key = entry.getKey();
+                SampleFamily value = entry.getValue();
+                boolean convert = false;
+                Assert.assertTrue(Arrays.asList(name).contains(key));
+                for (Sample s : value.samples) {
+                    assertSample(s, name);
+                    convert = true;
+                }
+                if (!convert) {
+                    // Throw exception when key not found
+                    throw new AssertionError("Failed to convert json telegraf message to Sample.");
+                }
+            }
+        });
+    }
+
+    @Test
+    public void testOneMemMetrics() throws Throwable {
+        String oneMemMetrics = "{\"metrics\":" +
+                "[{\"fields\":" +
+                "{\"available\":6047739904,\"available_percent\":35.41215070500567,\"total\":17078149120,\"used\":11030409216,\"used_percent\":64.58784929499433}," +
+                "\"name\":\"mem\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}]}";
+
+        TelegrafData telegrafData = assertTelegrafJSONConvert(oneMemMetrics);
+        String[] metricsNames = {"mem_available", "mem_available_percent", "mem_total", "mem_used", "mem_used_percent"};
+        assertConvertToSample(telegrafData, 5, metricsNames);
+        assertConvertToSampleFamily(telegrafData, 5, 1, metricsNames);
+    }
+
+    @Test
+    public void testMultipleMemMetrics() throws Throwable {
+        String threeMemMetrics = "{\"metrics\":" +
+                "[{\"fields\":" +
+                "{\"available\":6047739904,\"available_percent\":35.41215070500567,\"total\":17078149120,\"used\":11030409216,\"used_percent\":64.58784929499433}," +
+                "\"name\":\"mem\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}, " +
+                "{\"fields\":" +
+                "{\"available\":6047563904,\"available_percent\":56.41215070500567,\"total\":27048549120,\"used\":340364409216,\"used_percent\":44.58454929499433}," +
+                "\"name\":\"mem\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663491390}, " +
+                "{\"fields\":" +
+                "{\"available\":5047739904,\"available_percent\":43.41215070500567,\"total\":46078149120,\"used\":45030409216,\"used_percent\":23.58784929499433}," +
+                "\"name\":\"mem\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1453365320}]}";
+
+        TelegrafData telegrafData = assertTelegrafJSONConvert(threeMemMetrics);
+        String[] metricsNames = {"mem_available", "mem_available_percent", "mem_total", "mem_used", "mem_used_percent"};
+        assertConvertToSample(telegrafData, 5, metricsNames);
+        assertConvertToSampleFamily(telegrafData, 15, 1, metricsNames);
+    }
+
+    @Test
+    public void testOneCpuMetrics() throws Throwable {
+        String oneCpuMetrics = "{\"metrics\":" +
+                "[{\"fields\":" +
+                "{\"usage_idle\":95.32710280373831,\"usage_irq\":0.3115264797507788,\"usage_system\":1.7133956386292835,\"usage_user\":2.64797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}]}";
+
+        TelegrafData telegrafData = assertTelegrafJSONConvert(oneCpuMetrics);
+        String[] metricsNames = {"cpu_usage_idle", "cpu_usage_irq", "cpu_usage_system", "cpu_usage_user"};
+        assertConvertToSample(telegrafData, 4, metricsNames);
+        assertConvertToSampleFamily(telegrafData, 4, 1, metricsNames);
+    }
+
+    @Test
+    public void testMultipleCpuMetrics() throws Throwable {
+        String twoCpuMetrics = "{\"metrics\":" +
+                "[{\"fields\":" +
+                "{\"usage_idle\":95.32710280373831,\"usage_irq\":0.3115264797507788,\"usage_system\":1.7133956386292835,\"usage_user\":2.64797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}, " +
+                "{\"fields\":" +
+                "{\"usage_idle\":45.32710280373831,\"usage_irq\":0.4515344797507788,\"usage_system\":3.4533956386292835,\"usage_user\":45.64797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1453365320}]}";
+
+        TelegrafData telegrafData = assertTelegrafJSONConvert(twoCpuMetrics);
+        String[] metricsNames = {"cpu_usage_idle", "cpu_usage_irq", "cpu_usage_system", "cpu_usage_user"};
+        assertConvertToSample(telegrafData, 4, metricsNames);
+        assertConvertToSampleFamily(telegrafData, 8, 1, metricsNames);
+    }
+
+    @Test
+    public void testMultipleCpuMetricsWithSameTimestamp() throws Throwable {
+        String twoCpuMetrics = "{\"metrics\":" +
+                "[{\"fields\":" +
+                "{\"usage_idle\":95.32710280373831,\"usage_irq\":0.3115264797507788,\"usage_system\":1.7133956386292835,\"usage_user\":2.64797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}, " +
+                "{\"fields\":" +
+                "{\"usage_idle\":67.32710280373831,\"usage_irq\":0.5415264797507788,\"usage_system\":1.6533956386292835,\"usage_user\":76.54797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}, " +
+                "{\"fields\":" +
+                "{\"usage_idle\":74.32710280373831,\"usage_irq\":1.2535264797507788,\"usage_system\":4.5633956386292835,\"usage_user\":54.23797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}, " +
+                "{\"fields\":" +
+                "{\"usage_idle\":45.32710280373831,\"usage_irq\":0.4515344797507788,\"usage_system\":3.4533956386292835,\"usage_user\":45.64797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}]}";
+
+        TelegrafData telegrafData = assertTelegrafJSONConvert(twoCpuMetrics);
+        String[] metricsNames = {"cpu_usage_idle", "cpu_usage_irq", "cpu_usage_system", "cpu_usage_user"};
+        assertConvertToSample(telegrafData, 4, metricsNames);
+        assertConvertToSampleFamily(telegrafData, 4, 4, metricsNames);
+    }
+
+    @Test
+    public void testInvalidJSONConvert() {
+        String invalidMetrics = "This is a invalid metrics message";
+
+        Assert.assertThrows("Expected JsonParseException to throw, but it didn't.",
+                JsonParseException.class, () -> assertTelegrafJSONConvert(invalidMetrics));
+    }
+
+    @Test
+    public void testInvalidSampleNames() {
+        String oneCpuMetrics = "{\"metrics\":" +
+                "[{\"fields\":" +
+                "{\"usage_idle\":95.32710280373831,\"usage_irq\":0.3115264797507788,\"usage_system\":1.7133956386292835,\"usage_user\":2.64797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}]}";
+
+        Assert.assertThrows("Expected AssertionError to throw, but it didn't.",
+                AssertionError.class, () -> {
+                    TelegrafData telegrafData = assertTelegrafJSONConvert(oneCpuMetrics);
+                    String[] wrongMetricsNames = {"mem_available", "mem_available_percent", "mem_total", "mem_used", "mem_used_percent"};
+                    assertConvertToSample(telegrafData, 4, wrongMetricsNames);
+                });
+    }
+
+    @Test
+    public void testInvalidSampleFamilyNames() {
+        String oneMemMetrics = "{\"metrics\":" +
+                "[{\"fields\":" +
+                "{\"available\":6047739904,\"available_percent\":35.41215070500567,\"total\":17078149120,\"used\":11030409216,\"used_percent\":64.58784929499433}," +
+                "\"name\":\"mem\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}]}";
+
+        Assert.assertThrows("Expected AssertionError to throw, but it didn't.",
+                AssertionError.class, () -> {
+                    TelegrafData telegrafData = assertTelegrafJSONConvert(oneMemMetrics);
+                    String[] wrongMetricsNames = {"cpu_usage_idle", "cpu_usage_irq", "cpu_usage_system", "cpu_usage_user"};
+                    assertConvertToSampleFamily(telegrafData, 5, 1, wrongMetricsNames);
+                });
+    }
+
+    @Test
+    public void testWrongSampleNumbers() {
+        String twoCpuMetrics = "{\"metrics\":" +
+                "[{\"fields\":" +
+                "{\"usage_idle\":95.32710280373831,\"usage_irq\":0.3115264797507788,\"usage_system\":1.7133956386292835,\"usage_user\":2.64797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}, " +
+                "{\"fields\":" +
+                "{\"usage_idle\":45.32710280373831,\"usage_irq\":0.4515344797507788,\"usage_system\":3.4533956386292835,\"usage_user\":45.64797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1453365320}]}";
+
+        Assert.assertThrows("Expected AssertionError to throw, but it didn't.",
+                AssertionError.class, () -> {
+                    TelegrafData telegrafData = assertTelegrafJSONConvert(twoCpuMetrics);
+                    String[] metricsNames = {"cpu_usage_idle", "cpu_usage_irq", "cpu_usage_system", "cpu_usage_user"};
+                    assertConvertToSample(telegrafData, 6, metricsNames);
+                });
+    }
+
+    @Test
+    public void testWrongSampleFamilySize() {
+        String threeMemMetrics = "{\"metrics\":" +
+                "[{\"fields\":" +
+                "{\"available\":6047739904,\"available_percent\":35.41215070500567,\"total\":17078149120,\"used\":11030409216,\"used_percent\":64.58784929499433}," +
+                "\"name\":\"mem\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}, " +
+                "{\"fields\":" +
+                "{\"available\":6047563904,\"available_percent\":56.41215070500567,\"total\":27048549120,\"used\":340364409216,\"used_percent\":44.58454929499433}," +
+                "\"name\":\"mem\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663491390}, " +
+                "{\"fields\":" +
+                "{\"available\":5047739904,\"available_percent\":43.41215070500567,\"total\":46078149120,\"used\":45030409216,\"used_percent\":23.58784929499433}," +
+                "\"name\":\"mem\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1453365320}]}";
+
+        Assert.assertThrows("Expected AssertionError to throw, but it didn't.",
+                AssertionError.class, () -> {
+                    TelegrafData telegrafData = assertTelegrafJSONConvert(threeMemMetrics);
+                    String[] metricsNames = {"mem_available", "mem_available_percent", "mem_total", "mem_used", "mem_used_percent"};
+                    assertConvertToSample(telegrafData, 5, metricsNames);
+                    assertConvertToSampleFamily(telegrafData, 3, 1, metricsNames);
+                });
+    }
+
+    @Test
+    public void testWrongSampleNumbersOfSampleFamily() {
+        String threeMemMetrics = "{\"metrics\":" +
+                "[{\"fields\":" +
+                "{\"available\":6047739904,\"available_percent\":35.41215070500567,\"total\":17078149120,\"used\":11030409216,\"used_percent\":64.58784929499433}," +
+                "\"name\":\"mem\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}, " +
+                "{\"fields\":" +
+                "{\"available\":6047563904,\"available_percent\":56.41215070500567,\"total\":27048549120,\"used\":340364409216,\"used_percent\":44.58454929499433}," +
+                "\"name\":\"mem\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663491390}, " +
+                "{\"fields\":" +
+                "{\"available\":5047739904,\"available_percent\":43.41215070500567,\"total\":46078149120,\"used\":45030409216,\"used_percent\":23.58784929499433}," +
+                "\"name\":\"mem\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1453365320}]}";
+
+        Assert.assertThrows("Expected AssertionError to throw, but it didn't.",
+                AssertionError.class, () -> {
+                    TelegrafData telegrafData = assertTelegrafJSONConvert(threeMemMetrics);
+                    String[] metricsNames = {"mem_available", "mem_available_percent", "mem_total", "mem_used", "mem_used_percent"};
+                    assertConvertToSample(telegrafData, 5, metricsNames);
+                    assertConvertToSampleFamily(telegrafData, 15, 2, metricsNames);
+                });
+    }
+
+    @Test
+    public void testWrongSampleFamilySizeWithSameTimestamp() {
+        String fourCpuMetrics = "{\"metrics\":" +
+                "[{\"fields\":" +
+                "{\"usage_idle\":95.32710280373831,\"usage_irq\":0.3115264797507788,\"usage_system\":1.7133956386292835,\"usage_user\":2.64797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}, " +
+                "{\"fields\":" +
+                "{\"usage_idle\":67.32710280373831,\"usage_irq\":0.5415264797507788,\"usage_system\":1.6533956386292835,\"usage_user\":76.54797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}, " +
+                "{\"fields\":" +
+                "{\"usage_idle\":74.32710280373831,\"usage_irq\":1.2535264797507788,\"usage_system\":4.5633956386292835,\"usage_user\":54.23797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}, " +
+                "{\"fields\":" +
+                "{\"usage_idle\":45.32710280373831,\"usage_irq\":0.4515344797507788,\"usage_system\":3.4533956386292835,\"usage_user\":45.64797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}]}";
+
+        Assert.assertThrows("Expected AssertionError to throw, but it didn't.",
+                AssertionError.class, () -> {
+                    TelegrafData telegrafData = assertTelegrafJSONConvert(fourCpuMetrics);
+                    String[] metricsNames = {"mem_available", "mem_available_percent", "mem_total", "mem_used", "mem_used_percent"};
+                    assertConvertToSample(telegrafData, 5, metricsNames);
+                    assertConvertToSampleFamily(telegrafData, 3, 4, metricsNames);
+                });
+    }
+
+    @Test
+    public void testWrongSampleNumbersOfSampleFamilyWithSameTimestamp() {
+        String fourCpuMetrics = "{\"metrics\":" +
+                "[{\"fields\":" +
+                "{\"usage_idle\":95.32710280373831,\"usage_irq\":0.3115264797507788,\"usage_system\":1.7133956386292835,\"usage_user\":2.64797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}, " +
+                "{\"fields\":" +
+                "{\"usage_idle\":67.32710280373831,\"usage_irq\":0.5415264797507788,\"usage_system\":1.6533956386292835,\"usage_user\":76.54797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}, " +
+                "{\"fields\":" +
+                "{\"usage_idle\":74.32710280373831,\"usage_irq\":1.2535264797507788,\"usage_system\":4.5633956386292835,\"usage_user\":54.23797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}, " +
+                "{\"fields\":" +
+                "{\"usage_idle\":45.32710280373831,\"usage_irq\":0.4515344797507788,\"usage_system\":3.4533956386292835,\"usage_user\":45.64797507788162}," +
+                "\"name\":\"cpu\"," +
+                "\"tags\":{\"host\":\"localHost\"}," +
+                "\"timestamp\":1663391390}]}";
+
+        Assert.assertThrows("Expected AssertionError to throw, but it didn't.",
+                AssertionError.class, () -> {
+                    TelegrafData telegrafData = assertTelegrafJSONConvert(fourCpuMetrics);
+                    String[] metricsNames = {"mem_available", "mem_available_percent", "mem_total", "mem_used", "mem_used_percent"};
+                    assertConvertToSample(telegrafData, 5, metricsNames);
+                    assertConvertToSampleFamily(telegrafData, 4, 2, metricsNames);
+                });
+    }
+
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/mock/MockModuleManager.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/mock/MockModuleManager.java
new file mode 100644
index 0000000000..6db9734b25
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/mock/MockModuleManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.mock;
+
+import com.google.common.collect.Maps;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleNotFoundRuntimeException;
+import org.apache.skywalking.oap.server.library.module.ModuleProviderHolder;
+
+import java.util.Map;
+
+public abstract class MockModuleManager extends ModuleManager {
+    private final Map<String, ModuleProviderHolder> moduleProviderHolderMap = Maps.newHashMap();
+
+    public MockModuleManager() {
+        init();
+    }
+
+    protected abstract void init();
+
+    protected void register(String name, ModuleProviderHolder provider) {
+        moduleProviderHolderMap.put(name, provider);
+    }
+
+    @Override
+    public boolean has(String moduleName) {
+        return moduleProviderHolderMap.containsKey(moduleName);
+    }
+
+    @Override
+    public ModuleProviderHolder find(String moduleName) throws ModuleNotFoundRuntimeException {
+        if (!moduleProviderHolderMap.containsKey(moduleName)) {
+            throw new ModuleNotFoundRuntimeException("ModuleProviderHolder[" + moduleName + "] cannot found in MOCK.");
+        }
+        return moduleProviderHolderMap.get(moduleName);
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/mock/MockModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/mock/MockModuleProvider.java
new file mode 100644
index 0000000000..5e51194aec
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/telegraf/mock/MockModuleProvider.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.telegraf.mock;
+
+import com.google.common.collect.Maps;
+import org.apache.skywalking.oap.server.library.module.ModuleServiceHolder;
+import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+
+import java.util.Map;
+
+public abstract class MockModuleProvider implements ModuleServiceHolder {
+    protected Map<Class<? extends Service>, Service> serviceMap = Maps.newHashMap();
+
+    public MockModuleProvider() {
+        register();
+    }
+
+    protected abstract void register();
+
+    @Override
+    public void registerServiceImplementation(final Class<? extends Service> serviceType,
+                                              final Service service) throws ServiceNotProvidedException {
+        serviceMap.put(serviceType, service);
+    }
+
+    @Override
+    public <T extends Service> T getService(final Class<T> serviceType) throws ServiceNotProvidedException {
+        return (T) serviceMap.get(serviceType);
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/resources/telegraf-rules/vm.yaml b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/resources/telegraf-rules/vm.yaml
new file mode 100644
index 0000000000..e47c98a7d6
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-telegraf-receiver-plugin/src/test/resources/telegraf-rules/vm.yaml
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+expSuffix: service(['host'], Layer.OS_LINUX)
+metricPrefix: meter_vm
+metricsRules:
+
+  # cpu
+  - name: cpu_total_percentage
+    exp: cpu_usage_active.tagEqual('cpu', 'cpu-total')
+  - name: cpu_average_used
+    exp: cpu_usage_active.tagNotEqual('cpu', 'cpu-total').avg(['host', 'cpu'])
+  - name: cpu_load1
+    exp: system_load1
+  - name: cpu_load5
+    exp: system_load5
+  - name: cpu_load15
+    exp: system_load15
+
+  # memory
+  - name: memory_total
+    exp: mem_total
+  - name: memory_available
+    exp: mem_available
+  - name: memory_used
+    exp: mem_used
+
+  # swap
+  - name: memory_swap_free
+    exp: mem_swap_free
+  - name: memory_swap_total
+    exp: mem_swap_total
+  - name: memory_swap_percentage
+    exp: 100 - ((mem_swap_free / mem_swap_total) * 100)
+
+    #node filesystem
+  - name: filesystem_percentage
+    exp: disk_used_percent.avg(['host','device'])
+
+    #node disk
+  - name: disk_read
+    exp: diskio_read_bytes.rate('PT1M')
+  - name: disk_written
+    exp: diskio_write_bytes.rate('PT1M')
+
+    #node net
+  - name: network_receive
+    exp: net_bytes_recv.irate()
+  - name: network_transmit
+    exp: net_bytes_sent.irate()
+
+    #node netstat
+  - name: tcp_curr_estab
+    exp: netstat_tcp_established
+  - name: tcp_tw
+    exp: netstat_tcp_time_wait
+  - name: tcp_alloc
+    exp: netstat_tcp_listen
+  - name: udp_inuse
+    exp: netstat_udp_socket
\ No newline at end of file
diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml
index 219b1dadeb..f54ea1b4ec 100644
--- a/oap-server/server-starter/pom.xml
+++ b/oap-server/server-starter/pom.xml
@@ -166,6 +166,11 @@
             <artifactId>skywalking-ebpf-receiver-plugin</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>skywalking-telegraf-receiver-plugin</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!-- receiver module -->
 
         <!-- fetcher module -->
@@ -314,6 +319,7 @@
                         <exclude>zabbix-rules/</exclude>
                         <exclude>lal/</exclude>
                         <exclude>log-mal-rules/</exclude>
+                        <exclude>telegraf-rules/</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 06e7eb565e..4d7ad03721 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -536,3 +536,8 @@ receiver-event:
 receiver-ebpf:
   selector: ${SW_RECEIVER_EBPF:default}
   default:
+
+receiver-telegraf:
+  selector: ${SW_RECEIVER_TELEGRAF:default}
+  default:
+    activeFiles: ${SW_RECEIVER_TELEGRAF_ACTIVE_FILES:vm}
\ No newline at end of file
diff --git a/oap-server/server-starter/src/main/resources/telegraf-rules/vm.yaml b/oap-server/server-starter/src/main/resources/telegraf-rules/vm.yaml
new file mode 100644
index 0000000000..e47c98a7d6
--- /dev/null
+++ b/oap-server/server-starter/src/main/resources/telegraf-rules/vm.yaml
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+expSuffix: service(['host'], Layer.OS_LINUX)
+metricPrefix: meter_vm
+metricsRules:
+
+  # cpu
+  - name: cpu_total_percentage
+    exp: cpu_usage_active.tagEqual('cpu', 'cpu-total')
+  - name: cpu_average_used
+    exp: cpu_usage_active.tagNotEqual('cpu', 'cpu-total').avg(['host', 'cpu'])
+  - name: cpu_load1
+    exp: system_load1
+  - name: cpu_load5
+    exp: system_load5
+  - name: cpu_load15
+    exp: system_load15
+
+  # memory
+  - name: memory_total
+    exp: mem_total
+  - name: memory_available
+    exp: mem_available
+  - name: memory_used
+    exp: mem_used
+
+  # swap
+  - name: memory_swap_free
+    exp: mem_swap_free
+  - name: memory_swap_total
+    exp: mem_swap_total
+  - name: memory_swap_percentage
+    exp: 100 - ((mem_swap_free / mem_swap_total) * 100)
+
+    #node filesystem
+  - name: filesystem_percentage
+    exp: disk_used_percent.avg(['host','device'])
+
+    #node disk
+  - name: disk_read
+    exp: diskio_read_bytes.rate('PT1M')
+  - name: disk_written
+    exp: diskio_write_bytes.rate('PT1M')
+
+    #node net
+  - name: network_receive
+    exp: net_bytes_recv.irate()
+  - name: network_transmit
+    exp: net_bytes_sent.irate()
+
+    #node netstat
+  - name: tcp_curr_estab
+    exp: netstat_tcp_established
+  - name: tcp_tw
+    exp: netstat_tcp_time_wait
+  - name: tcp_alloc
+    exp: netstat_tcp_listen
+  - name: udp_inuse
+    exp: netstat_udp_socket
\ No newline at end of file
diff --git a/test/e2e-v2/cases/vm/telegraf/docker-compose.yml b/test/e2e-v2/cases/vm/telegraf/docker-compose.yml
new file mode 100644
index 0000000000..654bb1d9a1
--- /dev/null
+++ b/test/e2e-v2/cases/vm/telegraf/docker-compose.yml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.1'
+
+services:
+  oap:
+    extends:
+      file: ../../../script/docker-compose/base-compose.yml
+      service: oap
+    environment:
+      SW_RECEIVER_TELEGRAF: default
+      SW_RECEIVER_TELEGRAF_ACTIVE_FILES: vm
+    ports:
+      - 12800
+
+  telegraf:
+    image: telegraf:1.24
+    networks:
+      - e2e
+    volumes:
+      - ./telegraf.conf:/etc/telegraf/telegraf.conf
+    depends_on:
+      oap:
+        condition: service_healthy
+
+networks:
+  e2e:
\ No newline at end of file
diff --git a/test/e2e-v2/cases/vm/telegraf/e2e.yaml b/test/e2e-v2/cases/vm/telegraf/e2e.yaml
new file mode 100644
index 0000000000..a897c556cc
--- /dev/null
+++ b/test/e2e-v2/cases/vm/telegraf/e2e.yaml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to show how to write configuration files and can be used to test.
+
+setup:
+  env: compose
+  file: docker-compose.yml
+  timeout: 20m
+  init-system-environment: ../../../script/env
+  steps:
+    - name: set PATH
+      command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH
+    - name: install yq
+      command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
+    - name: install swctl
+      command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
+
+verify:
+  retry:
+    count: 20
+    interval: 3s
+  cases:
+    # service list
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql service ls
+      expected:  ../expected/service.yml
+    # linear metrics
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_vm_memory_used --service-name=vm-service |yq e 'to_entries' -
+      expected:  ../expected/metrics-has-value.yml
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_vm_memory_total --service-name=vm-service |yq e 'to_entries' -
+      expected:  ../expected/metrics-has-value.yml
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_vm_memory_available --service-name=vm-service |yq e 'to_entries' -
+      expected:  ../expected/metrics-has-value.yml
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_vm_cpu_total_percentage --service-name=vm-service |yq e 'to_entries' -
+      expected:  ../expected/metrics-has-value.yml
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_vm_cpu_load1 --service-name=vm-service |yq e 'to_entries' -
+      expected:  ../expected/metrics-has-value.yml
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_vm_cpu_load5 --service-name=vm-service |yq e 'to_entries' -
+      expected:  ../expected/metrics-has-value.yml
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_vm_cpu_load15 --service-name=vm-service |yq e 'to_entries' -
+      expected:  ../expected/metrics-has-value.yml
\ No newline at end of file
diff --git a/test/e2e-v2/cases/vm/telegraf/telegraf.conf b/test/e2e-v2/cases/vm/telegraf/telegraf.conf
new file mode 100644
index 0000000000..0b8295f6f9
--- /dev/null
+++ b/test/e2e-v2/cases/vm/telegraf/telegraf.conf
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Global tags can be specified here in key="value" format.
+[global_tags]
+  # dc = "us-east-1" # will tag all metrics with dc=us-east-1
+  # rack = "1a"
+  ## Environment variables can be used as tags, and throughout the config file
+  # user = "$USER"
+
+
+# Configuration for telegraf agent
+[agent]
+  ## Default data collection interval for all inputs
+  interval = "10s"
+  ## Rounds collection interval to 'interval'
+  ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
+  round_interval = true
+
+  ## Telegraf will send metrics to outputs in batches of at most
+  ## metric_batch_size metrics.
+  ## This controls the size of writes that Telegraf sends to output plugins.
+  metric_batch_size = 1000
+
+  ## Maximum number of unwritten metrics per output.  Increasing this value
+  ## allows for longer periods of output downtime without dropping metrics at the
+  ## cost of higher maximum memory usage.
+  metric_buffer_limit = 10000
+
+  ## Collection jitter is used to jitter the collection by a random amount.
+  ## Each plugin will sleep for a random time within jitter before collecting.
+  ## This can be used to avoid many plugins querying things like sysfs at the
+  ## same time, which can have a measurable effect on the system.
+  collection_jitter = "0s"
+
+  ## Collection offset is used to shift the collection by the given amount.
+  ## This can be be used to avoid many plugins querying constraint devices
+  ## at the same time by manually scheduling them in time.
+  # collection_offset = "0s"
+
+  ## Default flushing interval for all outputs. Maximum flush_interval will be
+  ## flush_interval + flush_jitter
+  flush_interval = "3s"
+  ## Jitter the flush interval by a random amount. This is primarily to avoid
+  ## large write spikes for users running a large number of telegraf instances.
+  ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
+  flush_jitter = "0s"
+
+  ## Collected metrics are rounded to the precision specified. Precision is
+  ## specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
+  ## Valid time units are "ns", "us" (or "µs"), "ms", "s".
+  ##
+  ## By default or when set to "0s", precision will be set to the same
+  ## timestamp order as the collection interval, with the maximum being 1s:
+  ##   ie, when interval = "10s", precision will be "1s"
+  ##       when interval = "250ms", precision will be "1ms"
+  ##
+  ## Precision will NOT be used for service inputs. It is up to each individual
+  ## service input to set the timestamp at the appropriate precision.
+  precision = "0s"
+
+  ## Override default hostname, if empty use os.Hostname()
+  hostname = "vm-service"
+  ## If set to true, do no set the "host" tag in the telegraf agent.
+  omit_hostname = false
+
+
+###############################################################################
+#                            OUTPUT PLUGINS                                   #
+###############################################################################
+
+
+# # A plugin that can transmit metrics over HTTP
+[[outputs.http]]
+#   ## URL is the address to send metrics to
+  url = "http://oap:12800/telegraf"
+#
+#   ## Timeout for HTTP message
+  timeout = "10s"
+#
+#   ## HTTP method, one of: "POST" or "PUT"
+  method = "POST"
+#
+#   ## Data format to output.
+#   ## Each data format has it's own unique set of configuration options, read
+#   ## more about them here:
+#   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
+  data_format = "json"
+
+
+###############################################################################
+#                            INPUT PLUGINS                                    #
+###############################################################################
+
+[[outputs.file]]
+  ## Files to write to, "stdout" is a specially handled file.
+  files = ["stdout", "/tmp/metrics.out"]
+
+# Read metrics about cpu usage
+[[inputs.cpu]]
+  ## Whether to report per-cpu stats or not
+  percpu = true
+  ## Whether to report total system cpu stats or not
+  totalcpu = true
+  ## If true, collect raw CPU time metrics
+  collect_cpu_time = false
+  ## If true, compute and report the sum of all non-idle CPU states
+  report_active = true
+
+
+# Read metrics about memory usage
+[[inputs.mem]]
+  # no configuration
+
+
+[[inputs.system]]
+  # no configuration
+
+[[inputs.disk]]
+  # no configuration
+
+[[inputs.diskio]]
+  # no configuration
+
+[[inputs.net]]
+  ## By default, telegraf gathers stats from any up interface (excluding loopback)
+  ## Setting interfaces will tell it to gather these explicit interfaces,
+  ## regardless of status. When specifying an interface, glob-style
+  ## patterns are also supported.
+  ##
+  # interfaces = ["eth*", "enp0s[0-1]", "lo"]
+  ##
+  ## On linux systems telegraf also collects protocol stats.
+  ## Setting ignore_protocol_stats to true will skip reporting of protocol metrics.
+  ##
+  # ignore_protocol_stats = false
+  ##
+
+[[inputs.netstat]]
\ No newline at end of file