You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/10/21 00:04:34 UTC

[skywalking] branch master updated: Support export `Trace` and `Log` through Kafka. (#9817)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new d98775790a Support export `Trace` and `Log` through Kafka. (#9817)
d98775790a is described below

commit d98775790aad32c6dc3a06bba6ec96fa8d10c2ad
Author: Wan Kai <wa...@foxmail.com>
AuthorDate: Fri Oct 21 08:04:18 2022 +0800

    Support export `Trace` and `Log` through Kafka. (#9817)
---
 .github/workflows/skywalking.yaml                  |   3 +
 docs/en/changes/changes.md                         |   1 +
 docs/en/setup/backend/configuration-vocabulary.md  |  11 +-
 docs/en/setup/backend/exporter.md                  | 121 +++++++++++++
 docs/en/setup/backend/metrics-exporter.md          |  82 +--------
 docs/menu.yml                                      |   4 +-
 oap-server-bom/pom.xml                             |  14 ++
 oap-server/exporter/pom.xml                        |   6 +-
 ...ExporterProvider.java => ExporterProvider.java} |  44 +++--
 ...PCExporterSetting.java => ExporterSetting.java} |  17 +-
 ...{GRPCExporter.java => GRPCMetricsExporter.java} |  34 ++--
 .../provider/kafka/KafkaExportProducer.java        |  54 ++++++
 .../provider/kafka/log/KafkaLogExporter.java       | 189 +++++++++++++++++++++
 .../provider/kafka/trace/KafkaTraceExporter.java   | 130 ++++++++++++++
 ...alking.oap.server.library.module.ModuleProvider |   2 +-
 .../provider/grpc/GRPCExporterProviderTest.java    |  20 ++-
 .../exporter/provider/grpc/GRPCExporterTest.java   |  12 +-
 oap-server/pom.xml                                 |   3 -
 ...{ExportWorker.java => ExportMetricsWorker.java} |   9 +-
 .../core/analysis/worker/ExportRecordWorker.java   |  64 +++++++
 .../analysis/worker/MetricsStreamProcessor.java    |   2 +-
 .../analysis/worker/RecordPersistentWorker.java    |   6 +-
 .../analysis/worker/RecordStreamProcessor.java     |   3 +-
 .../oap/server/core/exporter/ExporterModule.java   |   5 +-
 .../oap/server/core/exporter/ExporterService.java} |  20 +--
 .../{ExporterModule.java => LogExportService.java} |  18 +-
 .../core/exporter/MetricValuesExportService.java   |   4 +-
 ...ExporterModule.java => TraceExportService.java} |  18 +-
 .../kafka-fetcher-plugin/pom.xml                   |   5 +-
 .../zipkin-receiver-plugin/pom.xml                 |   1 -
 .../src/main/resources/application.yml             |  16 +-
 .../e2e-v2/cases/exporter/kafka/docker-compose.yml |  93 ++++++++++
 test/e2e-v2/cases/exporter/kafka/e2e.yaml          |  47 +++++
 .../cases/exporter/kafka/expected/result.yml       |   6 +-
 .../cases/exporter/kafka/exporter-cases.yaml       |  34 ++++
 35 files changed, 913 insertions(+), 185 deletions(-)

diff --git a/.github/workflows/skywalking.yaml b/.github/workflows/skywalking.yaml
index c94e4c5188..57a9f363ed 100644
--- a/.github/workflows/skywalking.yaml
+++ b/.github/workflows/skywalking.yaml
@@ -584,6 +584,9 @@ jobs:
             config: test/e2e-v2/cases/zipkin/mysql/sharding/e2e.yaml
           - name: APISIX metrics
             config: test/e2e-v2/cases/apisix/otel-collector/e2e.yaml
+
+          - name: Exporter Kafka
+            config: test/e2e-v2/cases/exporter/kafka/e2e.yaml
     steps:
       - uses: actions/checkout@v3
         with:
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 1fc02be16f..cee93a7d7c 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -63,6 +63,7 @@
 * Optimize the query time of tasks in ProfileTaskCache.
 * Fix metrics was put into wrong slot of the window in the alerting kernel.
 * Support `sumPerMinLabeled` in `MAL`.
+* Support export `Trace` and `Log` through Kafka.
 
 #### UI
 
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index eb53ab9924..efa441e5f9 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -292,8 +292,15 @@ The Configuration Vocabulary lists all available configurations provided by `app
 | -                       | -             | password                                                                                                                                                                 | Nacos Auth password.                                                                                                                                                                                                                                                                  [...]
 | -                       | -             | accessKey                                                                                                                                                                | Nacos Auth accessKey.                                                                                                                                                                                                                                                                 [...]
 | -                       | -             | secretKey                                                                                                                                                                | Nacos Auth secretKey.                                                                                                                                                                                                                                                                 [...]
-| exporter                | grpc          | targetHost                                                                                                                                                               | The host of target gRPC server for receiving export data.                                                                                                                                                                                                                             [...]
-| -                       | -             | targetPort                                                                                                                                                               | The port of target gRPC server for receiving export data.                                                                                                                                                                                                                             [...]
+| exporter                | default       | enableGRPCMetrics                                                                                                                                                        | Enable gRPC metrics exporter.                                                                                                                                                                                                                                                         [...]
+| -                       | -             | gRPCTargetHost                                                                                                                                                           | The host of target gRPC server for receiving export data                                                                                                                                                                                                                              [...]
+| -                       | -             | gRPCTargetPort                                                                                                                                                           | The port of target gRPC server for receiving export data.                                                                                                                                                                                                                             [...]
+| -                       | -             | enableKafkaTrace                                                                                                                                                         | Enable Kafka trace exporter.                                                                                                                                                                                                                                                          [...]
+| -                       | -             | enableKafkaLog                                                                                                                                                           | Enable Kafka log exporter.                                                                                                                                                                                                                                                            [...]
+| -                       | -             | kafkaBootstrapServers                                                                                                                                                    | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.                                                                                                                                                                                        [...]
+| -                       | -             | kafkaProducerConfig                                                                                                                                                      | Kafka producer config, JSON format as Properties.                                                                                                                                                                                                                                     [...]
+| -                       | -             | kafkaTopicTrace                                                                                                                                                          | Kafka topic name for trace.                                                                                                                                                                                                                                                           [...]
+| -                       | -             | kafkaTopicLog                                                                                                                                                            | Kafka topic name for log.                                                                                                                                                                                                                                                             [...]
 | health-checker          | default       | checkIntervalSeconds                                                                                                                                                     | The period of checking OAP internal health status (in seconds).                                                                                                                                                                                                                       [...]
 | configuration-discovery | default       | disableMessageDigest                                                                                                                                                     | If true, agent receives the latest configuration every time, even without making any changes. By default, OAP uses the SHA512 message digest mechanism to detect changes in configuration.                                                                                            [...]
 | receiver-event          | default       | gRPC services that handle events data.                                                                                                                                   | -                                                                                                                                                                                                                                                                                     [...]
diff --git a/docs/en/setup/backend/exporter.md b/docs/en/setup/backend/exporter.md
new file mode 100644
index 0000000000..468f42bef2
--- /dev/null
+++ b/docs/en/setup/backend/exporter.md
@@ -0,0 +1,121 @@
+# Exporter
+SkyWalking provides the essential functions of observability, including metrics aggregation, trace, log, alerting, and profiling.
+In many real-world scenarios, users may want to forward their data to a 3rd party system for further in-depth analysis.
+**Exporter** has made that possible.
+
+The exporter is an independent module that has to be manually activated.
+
+Right now, we provide the following exporting channels:
+1. gRPC Exporter
+- [Metrics](#metrics-grpc-exporter)
+1. Kafka Exporter
+- [Trace](#trace-kafka-exporter)
+- [Log](#log-kafka-exporter)
+
+## gRPC Exporter
+### Metrics gRPC Exporter
+Metrics gRPC exporter uses SkyWalking's native export service definition. Here is the proto definition: [metric-exporter.proto](https://github.com/apache/skywalking/blob/master/oap-server/exporter/src/main/proto/metric-exporter.proto).
+```proto
+service MetricExportService {
+    rpc export (stream ExportMetricValue) returns (ExportResponse) {
+    }
+
+    rpc subscription (SubscriptionReq) returns (SubscriptionsResp) {
+    }
+}
+```
+
+To activate the exporter, you should set `${SW_EXPORTER_ENABLE_GRPC_METRICS:true}` and config the target gRPC server address.
+```yaml
+exporter:
+  default:
+    # gRPC exporter
+    enableGRPCMetrics: ${SW_EXPORTER_ENABLE_GRPC_METRICS:true}
+    gRPCTargetHost: ${SW_EXPORTER_GRPC_HOST:127.0.0.1}
+    gRPCTargetPort: ${SW_EXPORTER_GRPC_PORT:9870}
+    ...
+```
+
+- `gRPCTargetHost`:`gRPCTargetPort` is the expected target service address. You could set any gRPC server to receive the data.
+- Target gRPC service needs to go on standby; otherwise, the OAP startup may fail.
+
+#### Target exporter service
+1. Subscription implementation.
+Return the expected metrics name list with event type (incremental or total). All names must match the OAL/MAL script definition.
+Return empty list, if you want to export all metrics in the incremental event type.
+
+2. Export implementation.
+Stream service. All subscribed metrics will be sent here based on the OAP core schedule. Also, if the OAP is deployed as a cluster,
+this method will be called concurrently. For metrics value, you need to follow `#type` to choose `#longValue` or `#doubleValue`.
+
+## Kafka Exporter
+### Trace Kafka Exporter
+Trace kafka exporter pushes messages to the Kafka Broker and Topic `skywalking-trace` to export the trace. Here is the message:
+```
+ProducerRecord<String, Bytes>
+Key: TraceSegmentId
+Value: Bytes of SegmentObject
+```
+
+The `SegmentObject` definition follows the protocol:
+[SkyWalking data collect protocol#Tracing.proto](https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/Tracing.proto).
+```proto
+// The segment is a collection of spans. It includes all collected spans in a simple one request context, such as a HTTP request process.
+message SegmentObject {
+    string traceId = 1;
+    string traceSegmentId = 2;
+    repeated SpanObject spans = 3;
+    string service = 4;
+    string serviceInstance = 5;
+    bool isSizeLimited = 6;
+}
+```
+
+To activate the exporter, you should set `${SW_EXPORTER_ENABLE_KAFKA_TRACE:true}` and config the Kafka server.
+```yaml
+exporter:
+  default:
+    # Kafka exporter
+    enableKafkaTrace: ${SW_EXPORTER_ENABLE_KAFKA_TRACE:true}
+    kafkaBootstrapServers: ${SW_EXPORTER_KAFKA_SERVERS:localhost:9092}
+    # Kafka producer config, JSON format as Properties.
+    kafkaProducerConfig: ${SW_EXPORTER_KAFKA_PRODUCER_CONFIG:""}
+    kafkaTopicTrace: ${SW_EXPORTER_KAFKA_TOPIC_TRACE:skywalking-trace}
+    ...
+```
+
+### Log Kafka Exporter
+Log kafka exporter pushes messages to the Kafka Broker and Topic `skywalking-log` to export the log. Here is the message:
+```
+ProducerRecord<String, Bytes>
+Key: LogRecordId
+Value: Bytes of LogData
+```
+
+The `LogData` definition follows the protocol:
+[SkyWalking data collect protocol#Logging.proto](https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto).
+```proto
+message LogData {
+    int64 timestamp = 1;
+    string service = 2;
+    string serviceInstance = 3;
+    string endpoint = 4;
+    LogDataBody body = 5;
+    TraceContext traceContext = 6;
+    LogTags tags = 7;
+    string layer = 8;
+}
+```
+
+To activate the exporter, you should set `${SW_EXPORTER_ENABLE_KAFKA_LOG:true}` and config the Kafka server.
+```yaml
+exporter:
+  default:
+    # Kafka exporter
+    enableKafkaLog: ${SW_EXPORTER_ENABLE_KAFKA_LOG:true}
+    kafkaBootstrapServers: ${SW_EXPORTER_KAFKA_SERVERS:localhost:9092}
+    # Kafka producer config, JSON format as Properties.
+    kafkaProducerConfig: ${SW_EXPORTER_KAFKA_PRODUCER_CONFIG:""}
+    kafkaTopicLog: ${SW_EXPORTER_KAFKA_TOPIC_LOG:skywalking-log}
+    ...
+```
diff --git a/docs/en/setup/backend/metrics-exporter.md b/docs/en/setup/backend/metrics-exporter.md
index 2b497861ac..aa0dfcd879 100644
--- a/docs/en/setup/backend/metrics-exporter.md
+++ b/docs/en/setup/backend/metrics-exporter.md
@@ -1,81 +1 @@
-# Metrics Exporter
-SkyWalking provides the essential functions of metrics aggregation, alarm, and analysis. 
-In many real-world scenarios, users may want to forward their data to a 3rd party system for further in-depth analysis.
-**Metrics Exporter** has made that possible.
-
-The metrics exporter is an independent module that has to be manually activated.
-
-Right now, we provide the following exporters:
-1. gRPC exporter
-
-## gRPC exporter
-gRPC exporter uses SkyWalking's native exporter service definition. Here is the proto definition.
-```proto
-service MetricExportService {
-    rpc export (stream ExportMetricValue) returns (ExportResponse) {
-    }
-
-    rpc subscription (SubscriptionReq) returns (SubscriptionsResp) {
-    }
-}
-
-message ExportMetricValue {
-    string metricName = 1;
-    string entityName = 2;
-    string entityId = 3;
-    ValueType type = 4;
-    int64 timeBucket = 5;
-    int64 longValue = 6;
-    double doubleValue = 7;
-    repeated int64 longValues = 8;
-}
-
-message SubscriptionsResp {
-    repeated SubscriptionMetric metrics = 1;
-}
-
-message SubscriptionMetric {
-    string metricName = 1;
-    EventType eventType = 2;
-}
-
-enum ValueType {
-    LONG = 0;
-    DOUBLE = 1;
-    MULTI_LONG = 2;
-}
-
-enum EventType {
-    // The metrics aggregated in this bulk, not include the existing persistent data.
-    INCREMENT = 0;
-    // Final result of the metrics at this moment.
-    TOTAL = 1;
-}
-
-message SubscriptionReq {
-
-}
-
-message ExportResponse {
-}
-```
-
-To activate the exporter, you should add this into your `application.yml`
-```yaml
-exporter:
-  grpc:
-    targetHost: 127.0.0.1
-    targetPort: 9870
-```
-
-- `targetHost`:`targetPort` is the expected target service address. You could set any gRPC server to receive the data.
-- Target gRPC service needs to go on standby; otherwise, the OAP startup may fail.
-
-## Target exporter service 
-### Subscription implementation
-Return the expected metrics name list with event type (incremental or total). All names must match the OAL/MAL script definition. 
-Return empty list, if you want to export all metrics in the incremental event type.
-
-### Export implementation
-Stream service. All subscribed metrics will be sent here based on the OAP core schedule. Also, if the OAP is deployed as a cluster, 
-this method will be called concurrently. For metrics value, you need to follow `#type` to choose `#longValue` or `#doubleValue`.
+All SkyWalking exporter(metrics, trace, log) instructions had been moved [here](exporter.md).
diff --git a/docs/menu.yml b/docs/menu.yml
index 7b4dbb7b5f..167a2c11ab 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -133,8 +133,8 @@ catalog:
             path: "/en/setup/backend/on-demand-pod-log"
       - name: "Extension"
         catalog:
-          - name: "Metrics Exporter"
-            path: "/en/setup/backend/metrics-exporter"
+          - name: "Exporter"
+            path: "/en/setup/backend/exporter"
           - name: "Dynamic Configuration"
             path: "/en/setup/backend/dynamic-config"
       - name: "UI Setup"
diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml
index b5b6f4e3f7..97ad598fef 100644
--- a/oap-server-bom/pom.xml
+++ b/oap-server-bom/pom.xml
@@ -73,6 +73,8 @@
         <httpcore.version>4.4.13</httpcore.version>
         <commons-compress.version>1.21</commons-compress.version>
         <banyandb-java-client.version>0.1.0</banyandb-java-client.version>
+        <kafka-clients.version>2.4.1</kafka-clients.version>
+        <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
     </properties>
 
     <dependencyManagement>
@@ -554,6 +556,18 @@
                 <artifactId>commons-compress</artifactId>
                 <version>${commons-compress.version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>kafka-clients</artifactId>
+                <version>${kafka-clients.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.springframework.kafka</groupId>
+                <artifactId>spring-kafka-test</artifactId>
+                <version>${spring-kafka-test.version}</version>
+                <scope>test</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 </project>
diff --git a/oap-server/exporter/pom.xml b/oap-server/exporter/pom.xml
index 4c371f88d7..00d7b83e91 100644
--- a/oap-server/exporter/pom.xml
+++ b/oap-server/exporter/pom.xml
@@ -38,6 +38,10 @@
             <artifactId>library-datacarrier-queue</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
         <dependency>
             <groupId>io.grpc</groupId>
             <artifactId>grpc-testing</artifactId>
@@ -76,4 +80,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterProvider.java
similarity index 56%
rename from oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
rename to oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterProvider.java
index c104759c17..87b374ba5b 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterProvider.java
@@ -16,24 +16,35 @@
  *
  */
 
-package org.apache.skywalking.oap.server.exporter.provider.grpc;
+package org.apache.skywalking.oap.server.exporter.provider;
 
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.exporter.ExporterModule;
+import org.apache.skywalking.oap.server.core.exporter.LogExportService;
 import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService;
+import org.apache.skywalking.oap.server.core.exporter.TraceExportService;
+import org.apache.skywalking.oap.server.exporter.provider.grpc.GRPCMetricsExporter;
+import org.apache.skywalking.oap.server.exporter.provider.kafka.log.KafkaLogExporter;
+import org.apache.skywalking.oap.server.exporter.provider.kafka.trace.KafkaTraceExporter;
 import org.apache.skywalking.oap.server.library.module.ModuleConfig;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 import org.apache.skywalking.oap.server.library.module.ModuleProvider;
 import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
 
-public class GRPCExporterProvider extends ModuleProvider {
-    private GRPCExporterSetting setting;
-    private GRPCExporter exporter;
+public class ExporterProvider extends ModuleProvider {
+    private final ExporterSetting setting;
+    private GRPCMetricsExporter grpcMetricsExporter;
+    private KafkaTraceExporter kafkaTraceExporter;
+    private KafkaLogExporter kafkaLogExporter;
+
+    public ExporterProvider() {
+        setting = new ExporterSetting();
+    }
 
     @Override
     public String name() {
-        return "grpc";
+        return "default";
     }
 
     @Override
@@ -43,24 +54,37 @@ public class GRPCExporterProvider extends ModuleProvider {
 
     @Override
     public ModuleConfig createConfigBeanIfAbsent() {
-        setting = new GRPCExporterSetting();
         return setting;
     }
 
     @Override
     public void prepare() throws ServiceNotProvidedException, ModuleStartException {
-        exporter = new GRPCExporter(setting);
-        this.registerServiceImplementation(MetricValuesExportService.class, exporter);
+        grpcMetricsExporter = new GRPCMetricsExporter(setting);
+        kafkaTraceExporter = new KafkaTraceExporter(getManager(), setting);
+        kafkaLogExporter = new KafkaLogExporter(getManager(), setting);
+        this.registerServiceImplementation(MetricValuesExportService.class, grpcMetricsExporter);
+        this.registerServiceImplementation(TraceExportService.class, kafkaTraceExporter);
+        this.registerServiceImplementation(LogExportService.class, kafkaLogExporter);
     }
 
     @Override
     public void start() throws ServiceNotProvidedException, ModuleStartException {
-
+        if (setting.isEnableGRPCMetrics()) {
+            grpcMetricsExporter.start();
+        }
+        if (setting.isEnableKafkaTrace()) {
+            kafkaTraceExporter.start();
+        }
+        if (setting.isEnableKafkaLog()) {
+            kafkaLogExporter.start();
+        }
     }
 
     @Override
     public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
-        exporter.fetchSubscriptionList();
+        if (setting.isEnableGRPCMetrics()) {
+            grpcMetricsExporter.fetchSubscriptionList();
+        }
     }
 
     @Override
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java
similarity index 65%
copy from oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java
copy to oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java
index 74eaa483a7..b70682e79e 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.skywalking.oap.server.exporter.provider.grpc;
+package org.apache.skywalking.oap.server.exporter.provider;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -24,9 +24,18 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig;
 
 @Setter
 @Getter
-public class GRPCExporterSetting extends ModuleConfig {
-    private String targetHost;
-    private int targetPort;
+public class ExporterSetting extends ModuleConfig {
+    private boolean enableGRPCMetrics = false;
+    private String gRPCTargetHost;
+    private int gRPCTargetPort;
     private int bufferChannelSize = 20000;
     private int bufferChannelNum = 2;
+
+    //kafka
+    private boolean enableKafkaTrace = false;
+    private boolean enableKafkaLog = false;
+    private String kafkaBootstrapServers;
+    private String kafkaProducerConfig;
+    private String kafkaTopicTrace = "skywalking-export-trace";
+    private String kafkaTopicLog = "skywalking-export-log";
 }
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java
similarity index 90%
rename from oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
rename to oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java
index 5fe5e5ba4b..cb1b49dca5 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java
@@ -45,6 +45,7 @@ import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric;
 import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq;
 import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
 import org.apache.skywalking.oap.server.exporter.grpc.ValueType;
+import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
 import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter;
 import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
 import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
@@ -52,22 +53,26 @@ import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.library.util.GRPCStreamStatus;
 
 @Slf4j
-public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<ExportData> {
+public class GRPCMetricsExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<ExportData> {
     /**
      * The period of subscription list fetching is hardcoded as 30s.
      */
     private static final long FETCH_SUBSCRIPTION_PERIOD = 30_000;
-    private final GRPCExporterSetting setting;
-    private final MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
-    private final MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub;
-    private final DataCarrier exportBuffer;
-    private final ReentrantLock fetchListLock;
+    private final ExporterSetting setting;
+    private MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
+    private MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub;
+    private DataCarrier exportBuffer;
+    private ReentrantLock fetchListLock;
     private volatile List<SubscriptionMetric> subscriptionList;
     private volatile long lastFetchTimestamp = 0;
 
-    public GRPCExporter(GRPCExporterSetting setting) {
+    public GRPCMetricsExporter(ExporterSetting setting) {
         this.setting = setting;
-        GRPCClient client = new GRPCClient(setting.getTargetHost(), setting.getTargetPort());
+    }
+
+    @Override
+    public void start() {
+        GRPCClient client = new GRPCClient(setting.getGRPCTargetHost(), setting.getGRPCTargetPort());
         client.connect();
         ManagedChannel channel = client.getChannel();
         exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel);
@@ -98,6 +103,11 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
         }
     }
 
+    @Override
+    public boolean isEnabled() {
+        return setting.isEnableGRPCMetrics();
+    }
+
     /**
      * Read the subscription list.
      */
@@ -209,17 +219,17 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
 
             if (sleepTime > 2000L) {
                 log.warn(
-                    "Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(), setting.getTargetHost(),
+                    "Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(), setting.getGRPCTargetHost(),
                     setting
-                        .getTargetPort(), sleepTime
+                        .getGRPCTargetPort(), sleepTime
                 );
                 cycle = 2000L;
             }
         }
 
         log.debug(
-            "Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getTargetHost(), setting
-                .getTargetPort(), sleepTime);
+            "Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getGRPCTargetHost(), setting
+                .getGRPCTargetPort(), sleepTime);
 
         fetchSubscriptionList();
     }
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/KafkaExportProducer.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/KafkaExportProducer.java
new file mode 100644
index 0000000000..64da5632c3
--- /dev/null
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/KafkaExportProducer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.exporter.provider.kafka;
+
+import com.google.gson.Gson;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
+@Slf4j
+public abstract class KafkaExportProducer {
+    protected final ExporterSetting setting;
+    private volatile KafkaProducer<String, Bytes> producer;
+
+    public KafkaExportProducer(ExporterSetting setting) {
+        this.setting = setting;
+    }
+
+    protected KafkaProducer<String, Bytes> getProducer() {
+        if (producer == null) {
+            Properties properties = new Properties();
+            properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, setting.getKafkaBootstrapServers());
+            if (StringUtil.isNotEmpty(setting.getKafkaProducerConfig())) {
+                Gson gson = new Gson();
+                Properties override = gson.fromJson(setting.getKafkaProducerConfig(), Properties.class);
+                properties.putAll(override);
+            }
+            producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer());
+        }
+        return producer;
+    }
+}
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java
new file mode 100644
index 0000000000..d8b9245a92
--- /dev/null
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.exporter.provider.kafka.log;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.skywalking.apm.network.logging.v3.JSONLog;
+import org.apache.skywalking.apm.network.logging.v3.LogData;
+import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
+import org.apache.skywalking.apm.network.logging.v3.LogTags;
+import org.apache.skywalking.apm.network.logging.v3.TextLog;
+import org.apache.skywalking.apm.network.logging.v3.TraceContext;
+import org.apache.skywalking.apm.network.logging.v3.YAMLLog;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
+import org.apache.skywalking.oap.server.core.exporter.LogExportService;
+import org.apache.skywalking.oap.server.core.query.type.ContentType;
+import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
+import org.apache.skywalking.oap.server.exporter.provider.kafka.KafkaExportProducer;
+import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
+import org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy;
+import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+
+@Slf4j
+public class KafkaLogExporter extends KafkaExportProducer implements LogExportService, IConsumer<LogRecord> {
+    private DataCarrier<LogRecord> exportBuffer;
+    private CounterMetrics successCounter;
+    private CounterMetrics errorCounter;
+    private final ModuleManager moduleManager;
+
+    public KafkaLogExporter(ModuleManager manager, ExporterSetting setting) {
+        super(setting);
+        this.moduleManager = manager;
+    }
+
+    @Override
+    public void start() {
+        super.getProducer();
+        exportBuffer = new DataCarrier<>(
+            "KafkaLogExporter", "KafkaLogExporter", setting.getBufferChannelNum(), setting.getBufferChannelSize(),
+            BufferStrategy.IF_POSSIBLE
+        );
+        exportBuffer.consume(this, 1, 200);
+        MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
+                                                     .provider()
+                                                     .getService(MetricsCreator.class);
+        successCounter = metricsCreator.createCounter(
+            "kafka_exporter_log_success_count", "The success number of log exported by kafka exporter.",
+            new MetricsTag.Keys("protocol"),
+            new MetricsTag.Values("kafka")
+        );
+        errorCounter = metricsCreator.createCounter(
+            "kafka_exporter_log_error_count", "The error number of log exported by kafka exporter",
+            new MetricsTag.Keys("protocol"), new MetricsTag.Values("kafka")
+        );
+    }
+
+    @Override
+    public void export(final LogRecord logRecord) {
+        if (logRecord != null) {
+            exportBuffer.produce(logRecord);
+        }
+    }
+
+    @Override
+    public boolean isEnabled() {
+        return setting.isEnableKafkaLog();
+    }
+
+    @Override
+    public void init(final Properties properties) {
+
+    }
+
+    @Override
+    public void consume(final List<LogRecord> data) {
+        for (LogRecord logRecord : data) {
+            if (logRecord != null) {
+                try {
+                    LogData logData = transLogData(logRecord);
+                    ProducerRecord<String, Bytes> record = new ProducerRecord<>(
+                        setting.getKafkaTopicLog(),
+                        logRecord.id(),
+                        Bytes.wrap(logData.toByteArray())
+                    );
+                    super.getProducer().send(record, (metadata, ex) -> {
+                        if (ex != null) {
+                            errorCounter.inc();
+                            log.error("Failed to export Log.", ex);
+                        } else {
+                            successCounter.inc();
+                        }
+                    });
+                } catch (InvalidProtocolBufferException e) {
+                    throw new UnexpectedException(
+                        "Failed to parse Log tags from LogRecord, id: " + logRecord.id() + ".", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onError(final List<LogRecord> data, final Throwable t) {
+
+    }
+
+    @Override
+    public void onExit() {
+
+    }
+
+    private LogData transLogData(LogRecord logRecord) throws InvalidProtocolBufferException {
+        LogData.Builder builder = LogData.newBuilder();
+        LogDataBody.Builder bodyBuilder = LogDataBody.newBuilder();
+        switch (ContentType.instanceOf(logRecord.getContentType())) {
+            case JSON:
+                bodyBuilder.setType(ContentType.JSON.name());
+                bodyBuilder.setJson(JSONLog.newBuilder().setJson(logRecord.getContent()));
+                break;
+            case YAML:
+                bodyBuilder.setType(ContentType.YAML.name());
+                bodyBuilder.setYaml(YAMLLog.newBuilder().setYaml(logRecord.getContent()));
+                break;
+            case TEXT:
+                bodyBuilder.setType(ContentType.TEXT.name());
+                bodyBuilder.setText(TextLog.newBuilder().setText(logRecord.getContent()));
+                break;
+            case NONE:
+                bodyBuilder.setType(ContentType.NONE.name());
+                break;
+            default:
+                throw new UnexpectedException(
+                    "Failed to parse Log ContentType value: " + logRecord.getContentType() + " from LogRecord, id: " + logRecord.id() + ".");
+        }
+        builder.setBody(bodyBuilder);
+
+        builder.setTimestamp(logRecord.getTimestamp());
+        builder.setService(IDManager.ServiceID.analysisId(logRecord.getServiceId()).getName());
+        if (StringUtil.isNotEmpty(logRecord.getServiceInstanceId())) {
+            builder.setServiceInstance(
+                IDManager.ServiceInstanceID.analysisId(logRecord.getServiceInstanceId()).getName());
+        }
+        if (StringUtil.isNotEmpty(logRecord.getEndpointId())) {
+            builder.setEndpoint(
+                IDManager.EndpointID.analysisId(logRecord.getEndpointId()).getEndpointName());
+        }
+
+        TraceContext.Builder contextBuilder = TraceContext.newBuilder();
+        if (StringUtil.isNotEmpty(logRecord.getTraceSegmentId())) {
+            contextBuilder.setTraceSegmentId(logRecord.getTraceSegmentId());
+            contextBuilder.setSpanId(logRecord.getSpanId());
+        }
+        if (StringUtil.isNotEmpty(logRecord.getTraceId())) {
+            contextBuilder.setTraceId(logRecord.getTraceId());
+        }
+        builder.setTraceContext(contextBuilder);
+        if (logRecord.getTagsRawData() != null) {
+            builder.setTags(LogTags.parseFrom(logRecord.getTagsRawData()));
+        }
+        return builder.build();
+    }
+}
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java
new file mode 100644
index 0000000000..243b09ea02
--- /dev/null
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.exporter.provider.kafka.trace;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.exporter.TraceExportService;
+import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
+import org.apache.skywalking.oap.server.exporter.provider.kafka.KafkaExportProducer;
+import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
+import org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy;
+import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+
+@Slf4j
+public class KafkaTraceExporter extends KafkaExportProducer implements TraceExportService, IConsumer<SegmentRecord> {
+    private DataCarrier<SegmentRecord> exportBuffer;
+    private CounterMetrics successCounter;
+    private CounterMetrics errorCounter;
+    private final ModuleManager moduleManager;
+
+    public KafkaTraceExporter(ModuleManager manager, ExporterSetting setting) {
+        super(setting);
+        this.moduleManager = manager;
+    }
+
+    @Override
+    public void start() {
+        super.getProducer();
+        exportBuffer = new DataCarrier<>(
+            "KafkaTraceExporter", "KafkaTraceExporter", setting.getBufferChannelNum(), setting.getBufferChannelSize(),
+            BufferStrategy.IF_POSSIBLE
+        );
+        exportBuffer.consume(this, 1, 200);
+        MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
+                                                     .provider()
+                                                     .getService(MetricsCreator.class);
+        successCounter = metricsCreator.createCounter(
+            "kafka_exporter_trace_success_count", "The success number of traces exported by kafka exporter.",
+            new MetricsTag.Keys("protocol"),
+            new MetricsTag.Values("kafka")
+        );
+        errorCounter = metricsCreator.createCounter(
+            "kafka_exporter_trace_error_count", "The error number of traces exported by kafka exporter",
+            new MetricsTag.Keys("protocol"), new MetricsTag.Values("kafka")
+        );
+    }
+
+    public void export(SegmentRecord segmentRecord) {
+        if (segmentRecord != null) {
+            exportBuffer.produce(segmentRecord);
+        }
+
+    }
+
+    @Override
+    public boolean isEnabled() {
+        return setting.isEnableKafkaTrace();
+    }
+
+    @Override
+    public void init(final Properties properties) {
+
+    }
+
+    @Override
+    public void consume(final List<SegmentRecord> data) {
+        for (SegmentRecord segmentRecord : data) {
+            if (segmentRecord != null) {
+                try {
+                    SegmentObject segmentObject = SegmentObject.parseFrom(segmentRecord.getDataBinary());
+                    ProducerRecord<String, Bytes> record = new ProducerRecord<>(
+                        setting.getKafkaTopicTrace(),
+                        segmentObject.getTraceSegmentId(),
+                        Bytes.wrap(segmentObject.toByteArray())
+                    );
+                    super.getProducer().send(record, (metadata, ex) -> {
+                        if (ex != null) {
+                            errorCounter.inc();
+                            log.error("Failed to export Trace.", ex);
+                        } else {
+                            successCounter.inc();
+                        }
+                    });
+                } catch (InvalidProtocolBufferException e) {
+                    throw new UnexpectedException(
+                        "Failed to parse SegmentObject from SegmentRecord, id: " + segmentRecord.getSegmentId() + ".", e
+                    );
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onError(final List<SegmentRecord> data, final Throwable t) {
+
+    }
+
+    @Override
+    public void onExit() {
+
+    }
+}
diff --git a/oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
index 0e65d5bdd0..56bedd4ae7 100644
--- a/oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
+++ b/oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -16,4 +16,4 @@
 #
 #
 
-org.apache.skywalking.oap.server.exporter.provider.grpc.GRPCExporterProvider
\ No newline at end of file
+org.apache.skywalking.oap.server.exporter.provider.ExporterProvider
diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
index 658a8ca02f..d2d307f200 100644
--- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
+++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
@@ -22,6 +22,8 @@ import java.util.Iterator;
 import java.util.ServiceLoader;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.exporter.ExporterModule;
+import org.apache.skywalking.oap.server.exporter.provider.ExporterProvider;
+import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.module.ModuleProvider;
 import org.apache.skywalking.oap.server.library.module.ModuleProviderHolder;
@@ -53,17 +55,17 @@ public class GRPCExporterProviderTest {
         assertTrue(moduleProviderIterator.hasNext());
 
         grpcExporterProvider = moduleProviderIterator.next();
-        assertTrue(grpcExporterProvider instanceof GRPCExporterProvider);
+        assertTrue(grpcExporterProvider instanceof ExporterProvider);
 
-        GRPCExporterSetting config = (GRPCExporterSetting) grpcExporterProvider.createConfigBeanIfAbsent();
+        ExporterSetting config = (ExporterSetting) grpcExporterProvider.createConfigBeanIfAbsent();
         assertNotNull(config);
-        assertNull(config.getTargetHost());
-        assertEquals(0, config.getTargetPort());
+        assertNull(config.getGRPCTargetHost());
+        assertEquals(0, config.getGRPCTargetPort());
         assertEquals(20000, config.getBufferChannelSize());
         assertEquals(2, config.getBufferChannelNum());
 
         //for test
-        config.setTargetHost("localhost");
+        config.setGRPCTargetHost("localhost");
 
         grpcExporterProvider.prepare();
 
@@ -72,7 +74,7 @@ public class GRPCExporterProviderTest {
 
     @Test
     public void name() {
-        assertEquals("grpc", grpcExporterProvider.name());
+        assertEquals("default", grpcExporterProvider.name());
     }
 
     @Test
@@ -82,7 +84,7 @@ public class GRPCExporterProviderTest {
 
     @Test
     public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
-        GRPCExporter exporter = mock(GRPCExporter.class);
+        GRPCMetricsExporter exporter = mock(GRPCMetricsExporter.class);
 
         ModuleManager manager = mock(ModuleManager.class);
         ModuleProviderHolder providerHolder = mock(ModuleProviderHolder.class);
@@ -95,7 +97,7 @@ public class GRPCExporterProviderTest {
         doNothing().when(exporter).fetchSubscriptionList();
 
         grpcExporterProvider.setManager(manager);
-        Whitebox.setInternalState(grpcExporterProvider, "exporter", exporter);
+        Whitebox.setInternalState(grpcExporterProvider, "grpcMetricsExporter", exporter);
         grpcExporterProvider.notifyAfterCompleted();
     }
 
@@ -106,4 +108,4 @@ public class GRPCExporterProviderTest {
         assertEquals(1, requireModules.length);
         assertEquals("core", requireModules[0]);
     }
-}
\ No newline at end of file
+}
diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
index f4838408da..54cd928227 100644
--- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
+++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
@@ -28,6 +28,7 @@ import org.apache.skywalking.oap.server.core.exporter.ExportData;
 import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
 import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
 import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
+import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -37,7 +38,7 @@ import static org.apache.skywalking.oap.server.core.exporter.ExportEvent.EventTy
 
 public class GRPCExporterTest {
 
-    private GRPCExporter exporter;
+    private GRPCMetricsExporter exporter;
 
     @Rule
     public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@@ -49,13 +50,14 @@ public class GRPCExporterTest {
 
     @Before
     public void setUp() throws Exception {
-        GRPCExporterSetting setting = new GRPCExporterSetting();
-        setting.setTargetHost("localhost");
-        setting.setTargetPort(9870);
-        exporter = new GRPCExporter(setting);
+        ExporterSetting setting = new ExporterSetting();
+        setting.setGRPCTargetHost("localhost");
+        setting.setGRPCTargetPort(9870);
+        exporter = new GRPCMetricsExporter(setting);
         grpcServerRule.getServiceRegistry().addService(service);
         stub = MetricExportServiceGrpc.newBlockingStub(grpcServerRule.getChannel());
         Whitebox.setInternalState(exporter, "blockingStub", stub);
+        exporter.start();
     }
 
     @Test
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 43ccd4371f..39e4ac29d6 100755
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -51,9 +51,6 @@
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-
-        <kafka-clients.version>2.4.1</kafka-clients.version>
-        <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
     </properties>
 
     <dependencies>
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportMetricsWorker.java
similarity index 88%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportMetricsWorker.java
index 63090bc3ff..a4bf9389b4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportMetricsWorker.java
@@ -28,10 +28,10 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
  * A bridge worker. If the {@link ExporterModule} provider declared and provides a implementation of {@link
  * MetricValuesExportService}, forward the export data to it.
  */
-public class ExportWorker extends AbstractWorker<ExportEvent> {
+public class ExportMetricsWorker extends AbstractWorker<ExportEvent> {
     private MetricValuesExportService exportService;
 
-    public ExportWorker(ModuleDefineHolder moduleDefineHolder) {
+    public ExportMetricsWorker(ModuleDefineHolder moduleDefineHolder) {
         super(moduleDefineHolder);
     }
 
@@ -43,8 +43,9 @@ public class ExportWorker extends AbstractWorker<ExportEvent> {
                                                        .provider()
                                                        .getService(MetricValuesExportService.class);
             }
-            exportService.export(event);
+            if (exportService.isEnabled()) {
+                exportService.export(event);
+            }
         }
     }
-
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportRecordWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportRecordWorker.java
new file mode 100644
index 0000000000..2a59a0af81
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportRecordWorker.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.exporter.ExporterModule;
+import org.apache.skywalking.oap.server.core.exporter.LogExportService;
+import org.apache.skywalking.oap.server.core.exporter.TraceExportService;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+
+public class ExportRecordWorker extends AbstractWorker<Record> {
+    private TraceExportService traceExportService;
+    private LogExportService logExportService;
+
+    public ExportRecordWorker(ModuleDefineHolder moduleDefineHolder) {
+        super(moduleDefineHolder);
+    }
+
+    @Override
+    public void in(Record record) {
+        if (record instanceof SegmentRecord) {
+            if (traceExportService != null || getModuleDefineHolder().has(ExporterModule.NAME)) {
+                if (traceExportService == null) {
+                    traceExportService = getModuleDefineHolder().find(ExporterModule.NAME)
+                                                                .provider()
+                                                                .getService(TraceExportService.class);
+                }
+                if (traceExportService.isEnabled()) {
+                    traceExportService.export((SegmentRecord) record);
+                }
+            }
+        } else if (record instanceof LogRecord) {
+            if (logExportService != null || getModuleDefineHolder().has(ExporterModule.NAME)) {
+                if (logExportService == null) {
+                    logExportService = getModuleDefineHolder().find(ExporterModule.NAME)
+                                                              .provider()
+                                                              .getService(LogExportService.class);
+                }
+                if (logExportService.isEnabled()) {
+                    logExportService.export((LogRecord) record);
+                }
+            }
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 1fb1d41ae7..7885c0738e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -205,7 +205,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
                                                            MetricsTransWorker transWorker,
                                                            boolean supportUpdate) {
         AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder);
-        ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
+        ExportMetricsWorker exportWorker = new ExportMetricsWorker(moduleDefineHolder);
 
         MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(
             moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker,
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
index 0ef237f0c1..757ef24f77 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.io.IOException;
+import java.util.Optional;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
@@ -37,12 +38,14 @@ public class RecordPersistentWorker extends AbstractWorker<Record> {
     private final Model model;
     private final IRecordDAO recordDAO;
     private final IBatchDAO batchDAO;
+    private final Optional<AbstractWorker<Record>> nextExportWorker;
 
-    RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IRecordDAO recordDAO) {
+    RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IRecordDAO recordDAO, AbstractWorker<Record> nextExportWorker) {
         super(moduleDefineHolder);
         this.model = model;
         this.recordDAO = recordDAO;
         this.batchDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
+        this.nextExportWorker = Optional.ofNullable(nextExportWorker);
     }
 
     @Override
@@ -53,5 +56,6 @@ public class RecordPersistentWorker extends AbstractWorker<Record> {
         } catch (IOException e) {
             LOGGER.error(e.getMessage(), e);
         }
+        this.nextExportWorker.ifPresent(exportWorker -> exportWorker.in(record));
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
index e1c586d830..893152794a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
@@ -76,7 +76,8 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
         // Record stream doesn't read data from database during the persistent process. Keep the timeRelativeID == false always.
         Model model = modelSetter.add(
             recordClass, stream.scopeId(), new Storage(stream.name(), false, DownSampling.Second), true);
-        RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO);
+        ExportRecordWorker exportWorker = new ExportRecordWorker(moduleDefineHolder);
+        RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO, exportWorker);
 
         workers.put(recordClass, persistentWorker);
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java
index 8a409a3358..9be78c111e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java
@@ -29,6 +29,9 @@ public class ExporterModule extends ModuleDefine {
 
     @Override
     public Class[] services() {
-        return new Class[] {MetricValuesExportService.class};
+        return new Class[] {
+            MetricValuesExportService.class,
+            TraceExportService.class,
+            LogExportService.class};
     }
 }
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java
similarity index 67%
rename from oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java
index 74eaa483a7..a322dae22a 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java
@@ -16,17 +16,15 @@
  *
  */
 
-package org.apache.skywalking.oap.server.exporter.provider.grpc;
+package org.apache.skywalking.oap.server.core.exporter;
 
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.Service;
 
-@Setter
-@Getter
-public class GRPCExporterSetting extends ModuleConfig {
-    private String targetHost;
-    private int targetPort;
-    private int bufferChannelSize = 20000;
-    private int bufferChannelNum = 2;
+public interface ExporterService<T> extends Service {
+
+    void start();
+
+    void export(T data);
+
+    boolean isEnabled();
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/LogExportService.java
similarity index 71%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/LogExportService.java
index 8a409a3358..7c20a2e7b6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/LogExportService.java
@@ -18,17 +18,13 @@
 
 package org.apache.skywalking.oap.server.core.exporter;
 
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
+import org.apache.skywalking.oap.server.library.module.Service;
 
-public class ExporterModule extends ModuleDefine {
-    public static final String NAME = "exporter";
-
-    public ExporterModule() {
-        super(NAME);
-    }
+/**
+ * Export the log from metrics through this service.
+ */
+public interface LogExportService extends Service, ExporterService<LogRecord> {
 
-    @Override
-    public Class[] services() {
-        return new Class[] {MetricValuesExportService.class};
-    }
+    void export(LogRecord logRecord);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
index dd83788fd3..9ee953cc3f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
@@ -21,9 +21,9 @@ package org.apache.skywalking.oap.server.core.exporter;
 import org.apache.skywalking.oap.server.library.module.Service;
 
 /**
- * Export the metrics value from metrics through this service, if provider exists.
+ * Export the metrics value from metrics through this service
  */
-public interface MetricValuesExportService extends Service {
+public interface MetricValuesExportService extends Service, ExporterService<ExportEvent> {
     /**
      * This method is sync-mode export, the performance effects the persistence result. Queue mode is highly
      * recommended.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/TraceExportService.java
similarity index 71%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/TraceExportService.java
index 8a409a3358..3da68a72bd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/TraceExportService.java
@@ -18,17 +18,13 @@
 
 package org.apache.skywalking.oap.server.core.exporter;
 
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.library.module.Service;
 
-public class ExporterModule extends ModuleDefine {
-    public static final String NAME = "exporter";
-
-    public ExporterModule() {
-        super(NAME);
-    }
+/**
+ * Export the traces from metrics through this service.
+ */
+public interface TraceExportService extends Service, ExporterService<SegmentRecord> {
 
-    @Override
-    public Class[] services() {
-        return new Class[] {MetricValuesExportService.class};
-    }
+    void export(SegmentRecord segmentRecord);
 }
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/pom.xml b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/pom.xml
index b654e11149..b74b9c3dd2 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/pom.xml
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/pom.xml
@@ -42,13 +42,10 @@
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
-            <version>${kafka-clients.version}</version>
         </dependency>
         <dependency>
             <groupId>org.springframework.kafka</groupId>
             <artifactId>spring-kafka-test</artifactId>
-            <version>${spring-kafka-test.version}</version>
-            <scope>test</scope>
         </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
index 3c08d83564..500595f23d 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
@@ -36,7 +36,6 @@
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
-            <version>${kafka-clients.version}</version>
         </dependency>
     </dependencies>
 </project>
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 9c4512cd0e..fba48e31e2 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -501,9 +501,19 @@ configuration:
 
 exporter:
   selector: ${SW_EXPORTER:-}
-  grpc:
-    targetHost: ${SW_EXPORTER_GRPC_HOST:127.0.0.1}
-    targetPort: ${SW_EXPORTER_GRPC_PORT:9870}
+  default:
+    # gRPC exporter
+    enableGRPCMetrics: ${SW_EXPORTER_ENABLE_GRPC_METRICS:false}
+    gRPCTargetHost: ${SW_EXPORTER_GRPC_HOST:127.0.0.1}
+    gRPCTargetPort: ${SW_EXPORTER_GRPC_PORT:9870}
+    # Kafka exporter
+    enableKafkaTrace: ${SW_EXPORTER_ENABLE_KAFKA_TRACE:false}
+    enableKafkaLog: ${SW_EXPORTER_ENABLE_KAFKA_LOG:false}
+    kafkaBootstrapServers: ${SW_EXPORTER_KAFKA_SERVERS:localhost:9092}
+    # Kafka producer config, JSON format as Properties.
+    kafkaProducerConfig: ${SW_EXPORTER_KAFKA_PRODUCER_CONFIG:""}
+    kafkaTopicTrace: ${SW_EXPORTER_KAFKA_TOPIC_TRACE:skywalking-export-trace}
+    kafkaTopicLog: ${SW_EXPORTER_KAFKA_TOPIC_LOG:skywalking-export-log}
 
 health-checker:
   selector: ${SW_HEALTH_CHECKER:-}
diff --git a/test/e2e-v2/cases/exporter/kafka/docker-compose.yml b/test/e2e-v2/cases/exporter/kafka/docker-compose.yml
new file mode 100644
index 0000000000..bc188e4d6d
--- /dev/null
+++ b/test/e2e-v2/cases/exporter/kafka/docker-compose.yml
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.1'
+
+services:
+  zookeeper:
+    image: zookeeper:3.4
+    hostname: zookeeper
+    expose:
+      - 2181
+    networks:
+      - e2e
+    environment:
+      - ALLOW_ANONYMOUS_LOGIN=yes
+    healthcheck:
+      test: [ "CMD", "sh", "-c", "nc -nz 127.0.0.1 2181" ]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+
+  broker-a:
+    image: bitnami/kafka:2.4.1
+    hostname: broker-a
+    expose:
+      - 9092
+    networks:
+      - e2e
+    environment:
+      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
+      - KAFKA_BROKER_ID=10
+      - ALLOW_PLAINTEXT_LISTENER=yes
+    depends_on:
+      zookeeper:
+        condition: service_healthy
+    healthcheck:
+      test: [ "CMD", "kafka-topics.sh", "--list", "--zookeeper", "zookeeper:2181" ]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+
+  oap:
+    extends:
+      file: ../../../script/docker-compose/base-compose.yml
+      service: oap
+    environment:
+      SW_EXPORTER: default
+      SW_EXPORTER_ENABLE_KAFKA_TRACE: "true"
+      SW_EXPORTER_ENABLE_KAFKA_LOG: "true"
+      SW_EXPORTER_KAFKA_SERVERS: broker-a:9092
+      SW_TELEMETRY: prometheus
+    ports:
+      - 1234
+      - 12800
+    depends_on:
+      broker-a:
+        condition: service_healthy
+    networks:
+      - e2e
+
+  provider:
+    extends:
+      file: ../../../script/docker-compose/base-compose.yml
+      service: provider
+    ports:
+      - 9090
+    depends_on:
+      oap:
+        condition: service_healthy
+
+  consumer:
+    extends:
+      file: ../../../script/docker-compose/base-compose.yml
+      service: consumer
+    ports:
+      - 9092
+    depends_on:
+      provider:
+        condition: service_healthy
+networks:
+  e2e:
diff --git a/test/e2e-v2/cases/exporter/kafka/e2e.yaml b/test/e2e-v2/cases/exporter/kafka/e2e.yaml
new file mode 100644
index 0000000000..d7220c4cfa
--- /dev/null
+++ b/test/e2e-v2/cases/exporter/kafka/e2e.yaml
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to show how to write configuration files and can be used to test.
+
+setup:
+  env: compose
+  file: docker-compose.yml
+  timeout: 20m
+  init-system-environment: ../../../script/env
+  steps:
+    - name: set PATH
+      command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH
+    - name: install yq
+      command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
+    - name: install swctl
+      command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
+
+trigger:
+  action: http
+  interval: 3s
+  times: 10
+  url: http://${consumer_host}:${consumer_9092}/users
+  method: POST
+  body: '{"id":"123","name":"skywalking"}'
+  headers:
+    "Content-Type": "application/json"
+
+verify:
+  retry:
+    count: 20
+    interval: 3s
+  cases:
+    - includes:
+        - exporter-cases.yaml
diff --git a/oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/test/e2e-v2/cases/exporter/kafka/expected/result.yml
similarity index 87%
copy from oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
copy to test/e2e-v2/cases/exporter/kafka/expected/result.yml
index 0e65d5bdd0..2f4eac1576 100644
--- a/oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
+++ b/test/e2e-v2/cases/exporter/kafka/expected/result.yml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
-#
 
-org.apache.skywalking.oap.server.exporter.provider.grpc.GRPCExporterProvider
\ No newline at end of file
+# "1" represents all received traces/logs were exported successfully: {exported  / received == 1 }
+result: 1
diff --git a/test/e2e-v2/cases/exporter/kafka/exporter-cases.yaml b/test/e2e-v2/cases/exporter/kafka/exporter-cases.yaml
new file mode 100644
index 0000000000..c77185c388
--- /dev/null
+++ b/test/e2e-v2/cases/exporter/kafka/exporter-cases.yaml
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cases:
+  # verify traces
+  - query: |
+        sleep 10;
+        trace_in=$(curl -s http://${oap_host}:${oap_1234} | grep trace_in_latency_count{ | awk '{print $2}'); \
+        trace_out=$(curl -s http://${oap_host}:${oap_1234} | grep kafka_exporter_trace_success_count{ | awk '{print $2}'); \
+        awk 'BEGIN {print '$trace_out/$trace_in'}' | yq e '{"result": .}' -
+    expected: expected/result.yml
+
+  # verify logs
+  - query: |
+      curl -X POST http://${provider_host}:${provider_9090}/logs/trigger > /dev/null;
+      curl -X POST http://${provider_host}:${provider_9090}/logs/trigger > /dev/null;
+      curl -X POST http://${provider_host}:${provider_9090}/logs/trigger > /dev/null;
+      sleep 10;
+        log_in=$(curl -s http://${oap_host}:${oap_1234} | grep log_in_latency_count{ | awk '{print $2}'); \
+        log_out=$(curl -s http://${oap_host}:${oap_1234} | grep kafka_exporter_log_success_count{ | awk '{print $2}'); \
+        awk 'BEGIN {print '$log_out/$log_in'}' | yq e '{"result": .}' -
+    expected: expected/result.yml