You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/10/21 00:04:34 UTC
[skywalking] branch master updated: Support export `Trace` and `Log` through Kafka. (#9817)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new d98775790a Support export `Trace` and `Log` through Kafka. (#9817)
d98775790a is described below
commit d98775790aad32c6dc3a06bba6ec96fa8d10c2ad
Author: Wan Kai <wa...@foxmail.com>
AuthorDate: Fri Oct 21 08:04:18 2022 +0800
Support export `Trace` and `Log` through Kafka. (#9817)
---
.github/workflows/skywalking.yaml | 3 +
docs/en/changes/changes.md | 1 +
docs/en/setup/backend/configuration-vocabulary.md | 11 +-
docs/en/setup/backend/exporter.md | 121 +++++++++++++
docs/en/setup/backend/metrics-exporter.md | 82 +--------
docs/menu.yml | 4 +-
oap-server-bom/pom.xml | 14 ++
oap-server/exporter/pom.xml | 6 +-
...ExporterProvider.java => ExporterProvider.java} | 44 +++--
...PCExporterSetting.java => ExporterSetting.java} | 17 +-
...{GRPCExporter.java => GRPCMetricsExporter.java} | 34 ++--
.../provider/kafka/KafkaExportProducer.java | 54 ++++++
.../provider/kafka/log/KafkaLogExporter.java | 189 +++++++++++++++++++++
.../provider/kafka/trace/KafkaTraceExporter.java | 130 ++++++++++++++
...alking.oap.server.library.module.ModuleProvider | 2 +-
.../provider/grpc/GRPCExporterProviderTest.java | 20 ++-
.../exporter/provider/grpc/GRPCExporterTest.java | 12 +-
oap-server/pom.xml | 3 -
...{ExportWorker.java => ExportMetricsWorker.java} | 9 +-
.../core/analysis/worker/ExportRecordWorker.java | 64 +++++++
.../analysis/worker/MetricsStreamProcessor.java | 2 +-
.../analysis/worker/RecordPersistentWorker.java | 6 +-
.../analysis/worker/RecordStreamProcessor.java | 3 +-
.../oap/server/core/exporter/ExporterModule.java | 5 +-
.../oap/server/core/exporter/ExporterService.java} | 20 +--
.../{ExporterModule.java => LogExportService.java} | 18 +-
.../core/exporter/MetricValuesExportService.java | 4 +-
...ExporterModule.java => TraceExportService.java} | 18 +-
.../kafka-fetcher-plugin/pom.xml | 5 +-
.../zipkin-receiver-plugin/pom.xml | 1 -
.../src/main/resources/application.yml | 16 +-
.../e2e-v2/cases/exporter/kafka/docker-compose.yml | 93 ++++++++++
test/e2e-v2/cases/exporter/kafka/e2e.yaml | 47 +++++
.../cases/exporter/kafka/expected/result.yml | 6 +-
.../cases/exporter/kafka/exporter-cases.yaml | 34 ++++
35 files changed, 913 insertions(+), 185 deletions(-)
diff --git a/.github/workflows/skywalking.yaml b/.github/workflows/skywalking.yaml
index c94e4c5188..57a9f363ed 100644
--- a/.github/workflows/skywalking.yaml
+++ b/.github/workflows/skywalking.yaml
@@ -584,6 +584,9 @@ jobs:
config: test/e2e-v2/cases/zipkin/mysql/sharding/e2e.yaml
- name: APISIX metrics
config: test/e2e-v2/cases/apisix/otel-collector/e2e.yaml
+
+ - name: Exporter Kafka
+ config: test/e2e-v2/cases/exporter/kafka/e2e.yaml
steps:
- uses: actions/checkout@v3
with:
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 1fc02be16f..cee93a7d7c 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -63,6 +63,7 @@
* Optimize the query time of tasks in ProfileTaskCache.
* Fix metrics was put into wrong slot of the window in the alerting kernel.
* Support `sumPerMinLabeled` in `MAL`.
+* Support export `Trace` and `Log` through Kafka.
#### UI
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index eb53ab9924..efa441e5f9 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -292,8 +292,15 @@ The Configuration Vocabulary lists all available configurations provided by `app
| - | - | password | Nacos Auth password. [...]
| - | - | accessKey | Nacos Auth accessKey. [...]
| - | - | secretKey | Nacos Auth secretKey. [...]
-| exporter | grpc | targetHost | The host of target gRPC server for receiving export data. [...]
-| - | - | targetPort | The port of target gRPC server for receiving export data. [...]
+| exporter | default | enableGRPCMetrics | Enable gRPC metrics exporter. [...]
+| - | - | gRPCTargetHost | The host of target gRPC server for receiving export data [...]
+| - | - | gRPCTargetPort | The port of target gRPC server for receiving export data. [...]
+| - | - | enableKafkaTrace | Enable Kafka trace exporter. [...]
+| - | - | enableKafkaLog | Enable Kafka log exporter. [...]
+| - | - | kafkaBootstrapServers | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. [...]
+| - | - | kafkaProducerConfig | Kafka producer config, JSON format as Properties. [...]
+| - | - | kafkaTopicTrace | Kafka topic name for trace. [...]
+| - | - | kafkaTopicLog | Kafka topic name for log. [...]
| health-checker | default | checkIntervalSeconds | The period of checking OAP internal health status (in seconds). [...]
| configuration-discovery | default | disableMessageDigest | If true, agent receives the latest configuration every time, even without making any changes. By default, OAP uses the SHA512 message digest mechanism to detect changes in configuration. [...]
| receiver-event | default | gRPC services that handle events data. | - [...]
diff --git a/docs/en/setup/backend/exporter.md b/docs/en/setup/backend/exporter.md
new file mode 100644
index 0000000000..468f42bef2
--- /dev/null
+++ b/docs/en/setup/backend/exporter.md
@@ -0,0 +1,121 @@
+# Exporter
+SkyWalking provides the essential functions of observability, including metrics aggregation, trace, log, alerting, and profiling.
+In many real-world scenarios, users may want to forward their data to a 3rd party system for further in-depth analysis.
+**Exporter** has made that possible.
+
+The exporter is an independent module that has to be manually activated.
+
+Right now, we provide the following exporting channels:
+1. gRPC Exporter
+- [Metrics](#metrics-grpc-exporter)
+1. Kafka Exporter
+- [Trace](#trace-kafka-exporter)
+- [Log](#log-kafka-exporter)
+
+## gRPC Exporter
+### Metrics gRPC Exporter
+Metrics gRPC exporter uses SkyWalking's native export service definition. Here is the proto definition: [metric-exporter.proto](https://github.com/apache/skywalking/blob/master/oap-server/exporter/src/main/proto/metric-exporter.proto).
+```proto
+service MetricExportService {
+ rpc export (stream ExportMetricValue) returns (ExportResponse) {
+ }
+
+ rpc subscription (SubscriptionReq) returns (SubscriptionsResp) {
+ }
+}
+```
+
+To activate the exporter, you should set `${SW_EXPORTER_ENABLE_GRPC_METRICS:true}` and config the target gRPC server address.
+```yaml
+exporter:
+ default:
+ # gRPC exporter
+ enableGRPCMetrics: ${SW_EXPORTER_ENABLE_GRPC_METRICS:true}
+ gRPCTargetHost: ${SW_EXPORTER_GRPC_HOST:127.0.0.1}
+ gRPCTargetPort: ${SW_EXPORTER_GRPC_PORT:9870}
+ ...
+```
+
+- `gRPCTargetHost`:`gRPCTargetPort` is the expected target service address. You could set any gRPC server to receive the data.
+- Target gRPC service needs to go on standby; otherwise, the OAP startup may fail.
+
+#### Target exporter service
+1. Subscription implementation.
+Return the expected metrics name list with event type (incremental or total). All names must match the OAL/MAL script definition.
+Return empty list, if you want to export all metrics in the incremental event type.
+
+2. Export implementation.
+Stream service. All subscribed metrics will be sent here based on the OAP core schedule. Also, if the OAP is deployed as a cluster,
+this method will be called concurrently. For metrics value, you need to follow `#type` to choose `#longValue` or `#doubleValue`.
+
+## Kafka Exporter
+### Trace Kafka Exporter
+Trace kafka exporter pushes messages to the Kafka Broker and Topic `skywalking-trace` to export the trace. Here is the message:
+```
+ProducerRecord<String, Bytes>
+Key: TraceSegmentId
+Value: Bytes of SegmentObject
+```
+
+The `SegmentObject` definition follows the protocol:
+[SkyWalking data collect protocol#Tracing.proto](https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/Tracing.proto).
+```proto
+// The segment is a collection of spans. It includes all collected spans in a simple one request context, such as a HTTP request process.
+message SegmentObject {
+ string traceId = 1;
+ string traceSegmentId = 2;
+ repeated SpanObject spans = 3;
+ string service = 4;
+ string serviceInstance = 5;
+ bool isSizeLimited = 6;
+}
+```
+
+To activate the exporter, you should set `${SW_EXPORTER_ENABLE_KAFKA_TRACE:true}` and config the Kafka server.
+```yaml
+exporter:
+ default:
+ # Kafka exporter
+ enableKafkaTrace: ${SW_EXPORTER_ENABLE_KAFKA_TRACE:true}
+ kafkaBootstrapServers: ${SW_EXPORTER_KAFKA_SERVERS:localhost:9092}
+ # Kafka producer config, JSON format as Properties.
+ kafkaProducerConfig: ${SW_EXPORTER_KAFKA_PRODUCER_CONFIG:""}
+ kafkaTopicTrace: ${SW_EXPORTER_KAFKA_TOPIC_TRACE:skywalking-trace}
+ ...
+```
+
+### Log Kafka Exporter
+Log kafka exporter pushes messages to the Kafka Broker and Topic `skywalking-log` to export the log. Here is the message:
+```
+ProducerRecord<String, Bytes>
+Key: LogRecordId
+Value: Bytes of LogData
+```
+
+The `LogData` definition follows the protocol:
+[SkyWalking data collect protocol#Logging.proto](https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto).
+```proto
+message LogData {
+ int64 timestamp = 1;
+ string service = 2;
+ string serviceInstance = 3;
+ string endpoint = 4;
+ LogDataBody body = 5;
+ TraceContext traceContext = 6;
+ LogTags tags = 7;
+ string layer = 8;
+}
+```
+
+To activate the exporter, you should set `${SW_EXPORTER_ENABLE_KAFKA_LOG:true}` and config the Kafka server.
+```yaml
+exporter:
+ default:
+ # Kafka exporter
+ enableKafkaLog: ${SW_EXPORTER_ENABLE_KAFKA_LOG:true}
+ kafkaBootstrapServers: ${SW_EXPORTER_KAFKA_SERVERS:localhost:9092}
+ # Kafka producer config, JSON format as Properties.
+ kafkaProducerConfig: ${SW_EXPORTER_KAFKA_PRODUCER_CONFIG:""}
+ kafkaTopicLog: ${SW_EXPORTER_KAFKA_TOPIC_LOG:skywalking-log}
+ ...
+```
diff --git a/docs/en/setup/backend/metrics-exporter.md b/docs/en/setup/backend/metrics-exporter.md
index 2b497861ac..aa0dfcd879 100644
--- a/docs/en/setup/backend/metrics-exporter.md
+++ b/docs/en/setup/backend/metrics-exporter.md
@@ -1,81 +1 @@
-# Metrics Exporter
-SkyWalking provides the essential functions of metrics aggregation, alarm, and analysis.
-In many real-world scenarios, users may want to forward their data to a 3rd party system for further in-depth analysis.
-**Metrics Exporter** has made that possible.
-
-The metrics exporter is an independent module that has to be manually activated.
-
-Right now, we provide the following exporters:
-1. gRPC exporter
-
-## gRPC exporter
-gRPC exporter uses SkyWalking's native exporter service definition. Here is the proto definition.
-```proto
-service MetricExportService {
- rpc export (stream ExportMetricValue) returns (ExportResponse) {
- }
-
- rpc subscription (SubscriptionReq) returns (SubscriptionsResp) {
- }
-}
-
-message ExportMetricValue {
- string metricName = 1;
- string entityName = 2;
- string entityId = 3;
- ValueType type = 4;
- int64 timeBucket = 5;
- int64 longValue = 6;
- double doubleValue = 7;
- repeated int64 longValues = 8;
-}
-
-message SubscriptionsResp {
- repeated SubscriptionMetric metrics = 1;
-}
-
-message SubscriptionMetric {
- string metricName = 1;
- EventType eventType = 2;
-}
-
-enum ValueType {
- LONG = 0;
- DOUBLE = 1;
- MULTI_LONG = 2;
-}
-
-enum EventType {
- // The metrics aggregated in this bulk, not include the existing persistent data.
- INCREMENT = 0;
- // Final result of the metrics at this moment.
- TOTAL = 1;
-}
-
-message SubscriptionReq {
-
-}
-
-message ExportResponse {
-}
-```
-
-To activate the exporter, you should add this into your `application.yml`
-```yaml
-exporter:
- grpc:
- targetHost: 127.0.0.1
- targetPort: 9870
-```
-
-- `targetHost`:`targetPort` is the expected target service address. You could set any gRPC server to receive the data.
-- Target gRPC service needs to go on standby; otherwise, the OAP startup may fail.
-
-## Target exporter service
-### Subscription implementation
-Return the expected metrics name list with event type (incremental or total). All names must match the OAL/MAL script definition.
-Return empty list, if you want to export all metrics in the incremental event type.
-
-### Export implementation
-Stream service. All subscribed metrics will be sent here based on the OAP core schedule. Also, if the OAP is deployed as a cluster,
-this method will be called concurrently. For metrics value, you need to follow `#type` to choose `#longValue` or `#doubleValue`.
+All SkyWalking exporter(metrics, trace, log) instructions had been moved [here](exporter.md).
diff --git a/docs/menu.yml b/docs/menu.yml
index 7b4dbb7b5f..167a2c11ab 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -133,8 +133,8 @@ catalog:
path: "/en/setup/backend/on-demand-pod-log"
- name: "Extension"
catalog:
- - name: "Metrics Exporter"
- path: "/en/setup/backend/metrics-exporter"
+ - name: "Exporter"
+ path: "/en/setup/backend/exporter"
- name: "Dynamic Configuration"
path: "/en/setup/backend/dynamic-config"
- name: "UI Setup"
diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml
index b5b6f4e3f7..97ad598fef 100644
--- a/oap-server-bom/pom.xml
+++ b/oap-server-bom/pom.xml
@@ -73,6 +73,8 @@
<httpcore.version>4.4.13</httpcore.version>
<commons-compress.version>1.21</commons-compress.version>
<banyandb-java-client.version>0.1.0</banyandb-java-client.version>
+ <kafka-clients.version>2.4.1</kafka-clients.version>
+ <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
</properties>
<dependencyManagement>
@@ -554,6 +556,18 @@
<artifactId>commons-compress</artifactId>
<version>${commons-compress.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka-clients.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <version>${spring-kafka-test.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>
diff --git a/oap-server/exporter/pom.xml b/oap-server/exporter/pom.xml
index 4c371f88d7..00d7b83e91 100644
--- a/oap-server/exporter/pom.xml
+++ b/oap-server/exporter/pom.xml
@@ -38,6 +38,10 @@
<artifactId>library-datacarrier-queue</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
@@ -76,4 +80,4 @@
</plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterProvider.java
similarity index 56%
rename from oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
rename to oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterProvider.java
index c104759c17..87b374ba5b 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterProvider.java
@@ -16,24 +16,35 @@
*
*/
-package org.apache.skywalking.oap.server.exporter.provider.grpc;
+package org.apache.skywalking.oap.server.exporter.provider;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.exporter.ExporterModule;
+import org.apache.skywalking.oap.server.core.exporter.LogExportService;
import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService;
+import org.apache.skywalking.oap.server.core.exporter.TraceExportService;
+import org.apache.skywalking.oap.server.exporter.provider.grpc.GRPCMetricsExporter;
+import org.apache.skywalking.oap.server.exporter.provider.kafka.log.KafkaLogExporter;
+import org.apache.skywalking.oap.server.exporter.provider.kafka.trace.KafkaTraceExporter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-public class GRPCExporterProvider extends ModuleProvider {
- private GRPCExporterSetting setting;
- private GRPCExporter exporter;
+public class ExporterProvider extends ModuleProvider {
+ private final ExporterSetting setting;
+ private GRPCMetricsExporter grpcMetricsExporter;
+ private KafkaTraceExporter kafkaTraceExporter;
+ private KafkaLogExporter kafkaLogExporter;
+
+ public ExporterProvider() {
+ setting = new ExporterSetting();
+ }
@Override
public String name() {
- return "grpc";
+ return "default";
}
@Override
@@ -43,24 +54,37 @@ public class GRPCExporterProvider extends ModuleProvider {
@Override
public ModuleConfig createConfigBeanIfAbsent() {
- setting = new GRPCExporterSetting();
return setting;
}
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
- exporter = new GRPCExporter(setting);
- this.registerServiceImplementation(MetricValuesExportService.class, exporter);
+ grpcMetricsExporter = new GRPCMetricsExporter(setting);
+ kafkaTraceExporter = new KafkaTraceExporter(getManager(), setting);
+ kafkaLogExporter = new KafkaLogExporter(getManager(), setting);
+ this.registerServiceImplementation(MetricValuesExportService.class, grpcMetricsExporter);
+ this.registerServiceImplementation(TraceExportService.class, kafkaTraceExporter);
+ this.registerServiceImplementation(LogExportService.class, kafkaLogExporter);
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
-
+ if (setting.isEnableGRPCMetrics()) {
+ grpcMetricsExporter.start();
+ }
+ if (setting.isEnableKafkaTrace()) {
+ kafkaTraceExporter.start();
+ }
+ if (setting.isEnableKafkaLog()) {
+ kafkaLogExporter.start();
+ }
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
- exporter.fetchSubscriptionList();
+ if (setting.isEnableGRPCMetrics()) {
+ grpcMetricsExporter.fetchSubscriptionList();
+ }
}
@Override
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java
similarity index 65%
copy from oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java
copy to oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java
index 74eaa483a7..b70682e79e 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.exporter.provider.grpc;
+package org.apache.skywalking.oap.server.exporter.provider;
import lombok.Getter;
import lombok.Setter;
@@ -24,9 +24,18 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig;
@Setter
@Getter
-public class GRPCExporterSetting extends ModuleConfig {
- private String targetHost;
- private int targetPort;
+public class ExporterSetting extends ModuleConfig {
+ private boolean enableGRPCMetrics = false;
+ private String gRPCTargetHost;
+ private int gRPCTargetPort;
private int bufferChannelSize = 20000;
private int bufferChannelNum = 2;
+
+ //kafka
+ private boolean enableKafkaTrace = false;
+ private boolean enableKafkaLog = false;
+ private String kafkaBootstrapServers;
+ private String kafkaProducerConfig;
+ private String kafkaTopicTrace = "skywalking-export-trace";
+ private String kafkaTopicLog = "skywalking-export-log";
}
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java
similarity index 90%
rename from oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
rename to oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java
index 5fe5e5ba4b..cb1b49dca5 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java
@@ -45,6 +45,7 @@ import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
import org.apache.skywalking.oap.server.exporter.grpc.ValueType;
+import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
@@ -52,22 +53,26 @@ import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.util.GRPCStreamStatus;
@Slf4j
-public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<ExportData> {
+public class GRPCMetricsExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<ExportData> {
/**
* The period of subscription list fetching is hardcoded as 30s.
*/
private static final long FETCH_SUBSCRIPTION_PERIOD = 30_000;
- private final GRPCExporterSetting setting;
- private final MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
- private final MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub;
- private final DataCarrier exportBuffer;
- private final ReentrantLock fetchListLock;
+ private final ExporterSetting setting;
+ private MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
+ private MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub;
+ private DataCarrier exportBuffer;
+ private ReentrantLock fetchListLock;
private volatile List<SubscriptionMetric> subscriptionList;
private volatile long lastFetchTimestamp = 0;
- public GRPCExporter(GRPCExporterSetting setting) {
+ public GRPCMetricsExporter(ExporterSetting setting) {
this.setting = setting;
- GRPCClient client = new GRPCClient(setting.getTargetHost(), setting.getTargetPort());
+ }
+
+ @Override
+ public void start() {
+ GRPCClient client = new GRPCClient(setting.getGRPCTargetHost(), setting.getGRPCTargetPort());
client.connect();
ManagedChannel channel = client.getChannel();
exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel);
@@ -98,6 +103,11 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
}
}
+ @Override
+ public boolean isEnabled() {
+ return setting.isEnableGRPCMetrics();
+ }
+
/**
* Read the subscription list.
*/
@@ -209,17 +219,17 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
if (sleepTime > 2000L) {
log.warn(
- "Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(), setting.getTargetHost(),
+ "Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(), setting.getGRPCTargetHost(),
setting
- .getTargetPort(), sleepTime
+ .getGRPCTargetPort(), sleepTime
);
cycle = 2000L;
}
}
log.debug(
- "Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getTargetHost(), setting
- .getTargetPort(), sleepTime);
+ "Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getGRPCTargetHost(), setting
+ .getGRPCTargetPort(), sleepTime);
fetchSubscriptionList();
}
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/KafkaExportProducer.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/KafkaExportProducer.java
new file mode 100644
index 0000000000..64da5632c3
--- /dev/null
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/KafkaExportProducer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.exporter.provider.kafka;
+
+import com.google.gson.Gson;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
+@Slf4j
+public abstract class KafkaExportProducer {
+ protected final ExporterSetting setting;
+ private volatile KafkaProducer<String, Bytes> producer;
+
+ public KafkaExportProducer(ExporterSetting setting) {
+ this.setting = setting;
+ }
+
+ protected KafkaProducer<String, Bytes> getProducer() {
+ if (producer == null) {
+ Properties properties = new Properties();
+ properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, setting.getKafkaBootstrapServers());
+ if (StringUtil.isNotEmpty(setting.getKafkaProducerConfig())) {
+ Gson gson = new Gson();
+ Properties override = gson.fromJson(setting.getKafkaProducerConfig(), Properties.class);
+ properties.putAll(override);
+ }
+ producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer());
+ }
+ return producer;
+ }
+}
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java
new file mode 100644
index 0000000000..d8b9245a92
--- /dev/null
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.exporter.provider.kafka.log;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.skywalking.apm.network.logging.v3.JSONLog;
+import org.apache.skywalking.apm.network.logging.v3.LogData;
+import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
+import org.apache.skywalking.apm.network.logging.v3.LogTags;
+import org.apache.skywalking.apm.network.logging.v3.TextLog;
+import org.apache.skywalking.apm.network.logging.v3.TraceContext;
+import org.apache.skywalking.apm.network.logging.v3.YAMLLog;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
+import org.apache.skywalking.oap.server.core.exporter.LogExportService;
+import org.apache.skywalking.oap.server.core.query.type.ContentType;
+import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
+import org.apache.skywalking.oap.server.exporter.provider.kafka.KafkaExportProducer;
+import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
+import org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy;
+import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+
+@Slf4j
+public class KafkaLogExporter extends KafkaExportProducer implements LogExportService, IConsumer<LogRecord> {
+ private DataCarrier<LogRecord> exportBuffer;
+ private CounterMetrics successCounter;
+ private CounterMetrics errorCounter;
+ private final ModuleManager moduleManager;
+
+ public KafkaLogExporter(ModuleManager manager, ExporterSetting setting) {
+ super(setting);
+ this.moduleManager = manager;
+ }
+
+ @Override
+ public void start() {
+ super.getProducer();
+ exportBuffer = new DataCarrier<>(
+ "KafkaLogExporter", "KafkaLogExporter", setting.getBufferChannelNum(), setting.getBufferChannelSize(),
+ BufferStrategy.IF_POSSIBLE
+ );
+ exportBuffer.consume(this, 1, 200);
+ MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
+ .provider()
+ .getService(MetricsCreator.class);
+ successCounter = metricsCreator.createCounter(
+ "kafka_exporter_log_success_count", "The success number of log exported by kafka exporter.",
+ new MetricsTag.Keys("protocol"),
+ new MetricsTag.Values("kafka")
+ );
+ errorCounter = metricsCreator.createCounter(
+ "kafka_exporter_log_error_count", "The error number of log exported by kafka exporter",
+ new MetricsTag.Keys("protocol"), new MetricsTag.Values("kafka")
+ );
+ }
+
+ @Override
+ public void export(final LogRecord logRecord) {
+ if (logRecord != null) {
+ exportBuffer.produce(logRecord);
+ }
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return setting.isEnableKafkaLog();
+ }
+
+ @Override
+ public void init(final Properties properties) {
+
+ }
+
+ @Override
+ public void consume(final List<LogRecord> data) {
+ for (LogRecord logRecord : data) {
+ if (logRecord != null) {
+ try {
+ LogData logData = transLogData(logRecord);
+ ProducerRecord<String, Bytes> record = new ProducerRecord<>(
+ setting.getKafkaTopicLog(),
+ logRecord.id(),
+ Bytes.wrap(logData.toByteArray())
+ );
+ super.getProducer().send(record, (metadata, ex) -> {
+ if (ex != null) {
+ errorCounter.inc();
+ log.error("Failed to export Log.", ex);
+ } else {
+ successCounter.inc();
+ }
+ });
+ } catch (InvalidProtocolBufferException e) {
+ throw new UnexpectedException(
+ "Failed to parse Log tags from LogRecord, id: " + logRecord.id() + ".", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onError(final List<LogRecord> data, final Throwable t) {
+
+ }
+
+ @Override
+ public void onExit() {
+
+ }
+
+ private LogData transLogData(LogRecord logRecord) throws InvalidProtocolBufferException {
+ LogData.Builder builder = LogData.newBuilder();
+ LogDataBody.Builder bodyBuilder = LogDataBody.newBuilder();
+ switch (ContentType.instanceOf(logRecord.getContentType())) {
+ case JSON:
+ bodyBuilder.setType(ContentType.JSON.name());
+ bodyBuilder.setJson(JSONLog.newBuilder().setJson(logRecord.getContent()));
+ break;
+ case YAML:
+ bodyBuilder.setType(ContentType.YAML.name());
+ bodyBuilder.setYaml(YAMLLog.newBuilder().setYaml(logRecord.getContent()));
+ break;
+ case TEXT:
+ bodyBuilder.setType(ContentType.TEXT.name());
+ bodyBuilder.setText(TextLog.newBuilder().setText(logRecord.getContent()));
+ break;
+ case NONE:
+ bodyBuilder.setType(ContentType.NONE.name());
+ break;
+ default:
+ throw new UnexpectedException(
+ "Failed to parse Log ContentType value: " + logRecord.getContentType() + " from LogRecord, id: " + logRecord.id() + ".");
+ }
+ builder.setBody(bodyBuilder);
+
+ builder.setTimestamp(logRecord.getTimestamp());
+ builder.setService(IDManager.ServiceID.analysisId(logRecord.getServiceId()).getName());
+ if (StringUtil.isNotEmpty(logRecord.getServiceInstanceId())) {
+ builder.setServiceInstance(
+ IDManager.ServiceInstanceID.analysisId(logRecord.getServiceInstanceId()).getName());
+ }
+ if (StringUtil.isNotEmpty(logRecord.getEndpointId())) {
+ builder.setEndpoint(
+ IDManager.EndpointID.analysisId(logRecord.getEndpointId()).getEndpointName());
+ }
+
+ TraceContext.Builder contextBuilder = TraceContext.newBuilder();
+ if (StringUtil.isNotEmpty(logRecord.getTraceSegmentId())) {
+ contextBuilder.setTraceSegmentId(logRecord.getTraceSegmentId());
+ contextBuilder.setSpanId(logRecord.getSpanId());
+ }
+ if (StringUtil.isNotEmpty(logRecord.getTraceId())) {
+ contextBuilder.setTraceId(logRecord.getTraceId());
+ }
+ builder.setTraceContext(contextBuilder);
+ if (logRecord.getTagsRawData() != null) {
+ builder.setTags(LogTags.parseFrom(logRecord.getTagsRawData()));
+ }
+ return builder.build();
+ }
+}
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java
new file mode 100644
index 0000000000..243b09ea02
--- /dev/null
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.exporter.provider.kafka.trace;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.exporter.TraceExportService;
+import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
+import org.apache.skywalking.oap.server.exporter.provider.kafka.KafkaExportProducer;
+import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
+import org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy;
+import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+
+@Slf4j
+public class KafkaTraceExporter extends KafkaExportProducer implements TraceExportService, IConsumer<SegmentRecord> {
+ private DataCarrier<SegmentRecord> exportBuffer;
+ private CounterMetrics successCounter;
+ private CounterMetrics errorCounter;
+ private final ModuleManager moduleManager;
+
+ public KafkaTraceExporter(ModuleManager manager, ExporterSetting setting) {
+ super(setting);
+ this.moduleManager = manager;
+ }
+
+ @Override
+ public void start() {
+ super.getProducer();
+ exportBuffer = new DataCarrier<>(
+ "KafkaTraceExporter", "KafkaTraceExporter", setting.getBufferChannelNum(), setting.getBufferChannelSize(),
+ BufferStrategy.IF_POSSIBLE
+ );
+ exportBuffer.consume(this, 1, 200);
+ MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
+ .provider()
+ .getService(MetricsCreator.class);
+ successCounter = metricsCreator.createCounter(
+ "kafka_exporter_trace_success_count", "The success number of traces exported by kafka exporter.",
+ new MetricsTag.Keys("protocol"),
+ new MetricsTag.Values("kafka")
+ );
+ errorCounter = metricsCreator.createCounter(
+ "kafka_exporter_trace_error_count", "The error number of traces exported by kafka exporter",
+ new MetricsTag.Keys("protocol"), new MetricsTag.Values("kafka")
+ );
+ }
+
+ public void export(SegmentRecord segmentRecord) {
+ if (segmentRecord != null) {
+ exportBuffer.produce(segmentRecord);
+ }
+
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return setting.isEnableKafkaTrace();
+ }
+
+ @Override
+ public void init(final Properties properties) {
+
+ }
+
+ @Override
+ public void consume(final List<SegmentRecord> data) {
+ for (SegmentRecord segmentRecord : data) {
+ if (segmentRecord != null) {
+ try {
+ SegmentObject segmentObject = SegmentObject.parseFrom(segmentRecord.getDataBinary());
+ ProducerRecord<String, Bytes> record = new ProducerRecord<>(
+ setting.getKafkaTopicTrace(),
+ segmentObject.getTraceSegmentId(),
+ Bytes.wrap(segmentObject.toByteArray())
+ );
+ super.getProducer().send(record, (metadata, ex) -> {
+ if (ex != null) {
+ errorCounter.inc();
+ log.error("Failed to export Trace.", ex);
+ } else {
+ successCounter.inc();
+ }
+ });
+ } catch (InvalidProtocolBufferException e) {
+ throw new UnexpectedException(
+ "Failed to parse SegmentObject from SegmentRecord, id: " + segmentRecord.getSegmentId() + ".", e
+ );
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onError(final List<SegmentRecord> data, final Throwable t) {
+
+ }
+
+ @Override
+ public void onExit() {
+
+ }
+}
diff --git a/oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
index 0e65d5bdd0..56bedd4ae7 100644
--- a/oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
+++ b/oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -16,4 +16,4 @@
#
#
-org.apache.skywalking.oap.server.exporter.provider.grpc.GRPCExporterProvider
\ No newline at end of file
+org.apache.skywalking.oap.server.exporter.provider.ExporterProvider
diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
index 658a8ca02f..d2d307f200 100644
--- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
+++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java
@@ -22,6 +22,8 @@ import java.util.Iterator;
import java.util.ServiceLoader;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.exporter.ExporterModule;
+import org.apache.skywalking.oap.server.exporter.provider.ExporterProvider;
+import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleProviderHolder;
@@ -53,17 +55,17 @@ public class GRPCExporterProviderTest {
assertTrue(moduleProviderIterator.hasNext());
grpcExporterProvider = moduleProviderIterator.next();
- assertTrue(grpcExporterProvider instanceof GRPCExporterProvider);
+ assertTrue(grpcExporterProvider instanceof ExporterProvider);
- GRPCExporterSetting config = (GRPCExporterSetting) grpcExporterProvider.createConfigBeanIfAbsent();
+ ExporterSetting config = (ExporterSetting) grpcExporterProvider.createConfigBeanIfAbsent();
assertNotNull(config);
- assertNull(config.getTargetHost());
- assertEquals(0, config.getTargetPort());
+ assertNull(config.getGRPCTargetHost());
+ assertEquals(0, config.getGRPCTargetPort());
assertEquals(20000, config.getBufferChannelSize());
assertEquals(2, config.getBufferChannelNum());
//for test
- config.setTargetHost("localhost");
+ config.setGRPCTargetHost("localhost");
grpcExporterProvider.prepare();
@@ -72,7 +74,7 @@ public class GRPCExporterProviderTest {
@Test
public void name() {
- assertEquals("grpc", grpcExporterProvider.name());
+ assertEquals("default", grpcExporterProvider.name());
}
@Test
@@ -82,7 +84,7 @@ public class GRPCExporterProviderTest {
@Test
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
- GRPCExporter exporter = mock(GRPCExporter.class);
+ GRPCMetricsExporter exporter = mock(GRPCMetricsExporter.class);
ModuleManager manager = mock(ModuleManager.class);
ModuleProviderHolder providerHolder = mock(ModuleProviderHolder.class);
@@ -95,7 +97,7 @@ public class GRPCExporterProviderTest {
doNothing().when(exporter).fetchSubscriptionList();
grpcExporterProvider.setManager(manager);
- Whitebox.setInternalState(grpcExporterProvider, "exporter", exporter);
+ Whitebox.setInternalState(grpcExporterProvider, "grpcMetricsExporter", exporter);
grpcExporterProvider.notifyAfterCompleted();
}
@@ -106,4 +108,4 @@ public class GRPCExporterProviderTest {
assertEquals(1, requireModules.length);
assertEquals("core", requireModules[0]);
}
-}
\ No newline at end of file
+}
diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
index f4838408da..54cd928227 100644
--- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
+++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
@@ -28,6 +28,7 @@ import org.apache.skywalking.oap.server.core.exporter.ExportData;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
+import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -37,7 +38,7 @@ import static org.apache.skywalking.oap.server.core.exporter.ExportEvent.EventTy
public class GRPCExporterTest {
- private GRPCExporter exporter;
+ private GRPCMetricsExporter exporter;
@Rule
public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@@ -49,13 +50,14 @@ public class GRPCExporterTest {
@Before
public void setUp() throws Exception {
- GRPCExporterSetting setting = new GRPCExporterSetting();
- setting.setTargetHost("localhost");
- setting.setTargetPort(9870);
- exporter = new GRPCExporter(setting);
+ ExporterSetting setting = new ExporterSetting();
+ setting.setGRPCTargetHost("localhost");
+ setting.setGRPCTargetPort(9870);
+ exporter = new GRPCMetricsExporter(setting);
grpcServerRule.getServiceRegistry().addService(service);
stub = MetricExportServiceGrpc.newBlockingStub(grpcServerRule.getChannel());
Whitebox.setInternalState(exporter, "blockingStub", stub);
+ exporter.start();
}
@Test
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 43ccd4371f..39e4ac29d6 100755
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -51,9 +51,6 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-
- <kafka-clients.version>2.4.1</kafka-clients.version>
- <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
</properties>
<dependencies>
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportMetricsWorker.java
similarity index 88%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportMetricsWorker.java
index 63090bc3ff..a4bf9389b4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportMetricsWorker.java
@@ -28,10 +28,10 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
* A bridge worker. If the {@link ExporterModule} provider declared and provides a implementation of {@link
* MetricValuesExportService}, forward the export data to it.
*/
-public class ExportWorker extends AbstractWorker<ExportEvent> {
+public class ExportMetricsWorker extends AbstractWorker<ExportEvent> {
private MetricValuesExportService exportService;
- public ExportWorker(ModuleDefineHolder moduleDefineHolder) {
+ public ExportMetricsWorker(ModuleDefineHolder moduleDefineHolder) {
super(moduleDefineHolder);
}
@@ -43,8 +43,9 @@ public class ExportWorker extends AbstractWorker<ExportEvent> {
.provider()
.getService(MetricValuesExportService.class);
}
- exportService.export(event);
+ if (exportService.isEnabled()) {
+ exportService.export(event);
+ }
}
}
-
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportRecordWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportRecordWorker.java
new file mode 100644
index 0000000000..2a59a0af81
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportRecordWorker.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.exporter.ExporterModule;
+import org.apache.skywalking.oap.server.core.exporter.LogExportService;
+import org.apache.skywalking.oap.server.core.exporter.TraceExportService;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+
+public class ExportRecordWorker extends AbstractWorker<Record> {
+ private TraceExportService traceExportService;
+ private LogExportService logExportService;
+
+ public ExportRecordWorker(ModuleDefineHolder moduleDefineHolder) {
+ super(moduleDefineHolder);
+ }
+
+ @Override
+ public void in(Record record) {
+ if (record instanceof SegmentRecord) {
+ if (traceExportService != null || getModuleDefineHolder().has(ExporterModule.NAME)) {
+ if (traceExportService == null) {
+ traceExportService = getModuleDefineHolder().find(ExporterModule.NAME)
+ .provider()
+ .getService(TraceExportService.class);
+ }
+ if (traceExportService.isEnabled()) {
+ traceExportService.export((SegmentRecord) record);
+ }
+ }
+ } else if (record instanceof LogRecord) {
+ if (logExportService != null || getModuleDefineHolder().has(ExporterModule.NAME)) {
+ if (logExportService == null) {
+ logExportService = getModuleDefineHolder().find(ExporterModule.NAME)
+ .provider()
+ .getService(LogExportService.class);
+ }
+ if (logExportService.isEnabled()) {
+ logExportService.export((LogRecord) record);
+ }
+ }
+ }
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 1fb1d41ae7..7885c0738e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -205,7 +205,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
MetricsTransWorker transWorker,
boolean supportUpdate) {
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder);
- ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
+ ExportMetricsWorker exportWorker = new ExportMetricsWorker(moduleDefineHolder);
MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(
moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker,
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
index 0ef237f0c1..757ef24f77 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.io.IOException;
+import java.util.Optional;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
@@ -37,12 +38,14 @@ public class RecordPersistentWorker extends AbstractWorker<Record> {
private final Model model;
private final IRecordDAO recordDAO;
private final IBatchDAO batchDAO;
+ private final Optional<AbstractWorker<Record>> nextExportWorker;
- RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IRecordDAO recordDAO) {
+ RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IRecordDAO recordDAO, AbstractWorker<Record> nextExportWorker) {
super(moduleDefineHolder);
this.model = model;
this.recordDAO = recordDAO;
this.batchDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
+ this.nextExportWorker = Optional.ofNullable(nextExportWorker);
}
@Override
@@ -53,5 +56,6 @@ public class RecordPersistentWorker extends AbstractWorker<Record> {
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
+ this.nextExportWorker.ifPresent(exportWorker -> exportWorker.in(record));
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
index e1c586d830..893152794a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
@@ -76,7 +76,8 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
// Record stream doesn't read data from database during the persistent process. Keep the timeRelativeID == false always.
Model model = modelSetter.add(
recordClass, stream.scopeId(), new Storage(stream.name(), false, DownSampling.Second), true);
- RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO);
+ ExportRecordWorker exportWorker = new ExportRecordWorker(moduleDefineHolder);
+ RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO, exportWorker);
workers.put(recordClass, persistentWorker);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java
index 8a409a3358..9be78c111e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java
@@ -29,6 +29,9 @@ public class ExporterModule extends ModuleDefine {
@Override
public Class[] services() {
- return new Class[] {MetricValuesExportService.class};
+ return new Class[] {
+ MetricValuesExportService.class,
+ TraceExportService.class,
+ LogExportService.class};
}
}
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java
similarity index 67%
rename from oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java
index 74eaa483a7..a322dae22a 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java
@@ -16,17 +16,15 @@
*
*/
-package org.apache.skywalking.oap.server.exporter.provider.grpc;
+package org.apache.skywalking.oap.server.core.exporter;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.Service;
-@Setter
-@Getter
-public class GRPCExporterSetting extends ModuleConfig {
- private String targetHost;
- private int targetPort;
- private int bufferChannelSize = 20000;
- private int bufferChannelNum = 2;
+public interface ExporterService<T> extends Service {
+
+ void start();
+
+ void export(T data);
+
+ boolean isEnabled();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/LogExportService.java
similarity index 71%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/LogExportService.java
index 8a409a3358..7c20a2e7b6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/LogExportService.java
@@ -18,17 +18,13 @@
package org.apache.skywalking.oap.server.core.exporter;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
+import org.apache.skywalking.oap.server.library.module.Service;
-public class ExporterModule extends ModuleDefine {
- public static final String NAME = "exporter";
-
- public ExporterModule() {
- super(NAME);
- }
+/**
+ * Export the log from metrics through this service.
+ */
+public interface LogExportService extends Service, ExporterService<LogRecord> {
- @Override
- public Class[] services() {
- return new Class[] {MetricValuesExportService.class};
- }
+ void export(LogRecord logRecord);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
index dd83788fd3..9ee953cc3f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
@@ -21,9 +21,9 @@ package org.apache.skywalking.oap.server.core.exporter;
import org.apache.skywalking.oap.server.library.module.Service;
/**
- * Export the metrics value from metrics through this service, if provider exists.
+ * Export the metrics value from metrics through this service
*/
-public interface MetricValuesExportService extends Service {
+public interface MetricValuesExportService extends Service, ExporterService<ExportEvent> {
/**
* This method is sync-mode export, the performance effects the persistence result. Queue mode is highly
* recommended.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/TraceExportService.java
similarity index 71%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/TraceExportService.java
index 8a409a3358..3da68a72bd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/TraceExportService.java
@@ -18,17 +18,13 @@
package org.apache.skywalking.oap.server.core.exporter;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.library.module.Service;
-public class ExporterModule extends ModuleDefine {
- public static final String NAME = "exporter";
-
- public ExporterModule() {
- super(NAME);
- }
+/**
+ * Export the traces from metrics through this service.
+ */
+public interface TraceExportService extends Service, ExporterService<SegmentRecord> {
- @Override
- public Class[] services() {
- return new Class[] {MetricValuesExportService.class};
- }
+ void export(SegmentRecord segmentRecord);
}
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/pom.xml b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/pom.xml
index b654e11149..b74b9c3dd2 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/pom.xml
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/pom.xml
@@ -42,13 +42,10 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
- <version>${kafka-clients.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
- <version>${spring-kafka-test.version}</version>
- <scope>test</scope>
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
index 3c08d83564..500595f23d 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
@@ -36,7 +36,6 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
- <version>${kafka-clients.version}</version>
</dependency>
</dependencies>
</project>
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 9c4512cd0e..fba48e31e2 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -501,9 +501,19 @@ configuration:
exporter:
selector: ${SW_EXPORTER:-}
- grpc:
- targetHost: ${SW_EXPORTER_GRPC_HOST:127.0.0.1}
- targetPort: ${SW_EXPORTER_GRPC_PORT:9870}
+ default:
+ # gRPC exporter
+ enableGRPCMetrics: ${SW_EXPORTER_ENABLE_GRPC_METRICS:false}
+ gRPCTargetHost: ${SW_EXPORTER_GRPC_HOST:127.0.0.1}
+ gRPCTargetPort: ${SW_EXPORTER_GRPC_PORT:9870}
+ # Kafka exporter
+ enableKafkaTrace: ${SW_EXPORTER_ENABLE_KAFKA_TRACE:false}
+ enableKafkaLog: ${SW_EXPORTER_ENABLE_KAFKA_LOG:false}
+ kafkaBootstrapServers: ${SW_EXPORTER_KAFKA_SERVERS:localhost:9092}
+ # Kafka producer config, JSON format as Properties.
+ kafkaProducerConfig: ${SW_EXPORTER_KAFKA_PRODUCER_CONFIG:""}
+ kafkaTopicTrace: ${SW_EXPORTER_KAFKA_TOPIC_TRACE:skywalking-export-trace}
+ kafkaTopicLog: ${SW_EXPORTER_KAFKA_TOPIC_LOG:skywalking-export-log}
health-checker:
selector: ${SW_HEALTH_CHECKER:-}
diff --git a/test/e2e-v2/cases/exporter/kafka/docker-compose.yml b/test/e2e-v2/cases/exporter/kafka/docker-compose.yml
new file mode 100644
index 0000000000..bc188e4d6d
--- /dev/null
+++ b/test/e2e-v2/cases/exporter/kafka/docker-compose.yml
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.1'
+
+services:
+ zookeeper:
+ image: zookeeper:3.4
+ hostname: zookeeper
+ expose:
+ - 2181
+ networks:
+ - e2e
+ environment:
+ - ALLOW_ANONYMOUS_LOGIN=yes
+ healthcheck:
+ test: [ "CMD", "sh", "-c", "nc -nz 127.0.0.1 2181" ]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+
+ broker-a:
+ image: bitnami/kafka:2.4.1
+ hostname: broker-a
+ expose:
+ - 9092
+ networks:
+ - e2e
+ environment:
+ - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
+ - KAFKA_BROKER_ID=10
+ - ALLOW_PLAINTEXT_LISTENER=yes
+ depends_on:
+ zookeeper:
+ condition: service_healthy
+ healthcheck:
+ test: [ "CMD", "kafka-topics.sh", "--list", "--zookeeper", "zookeeper:2181" ]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+
+ oap:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: oap
+ environment:
+ SW_EXPORTER: default
+ SW_EXPORTER_ENABLE_KAFKA_TRACE: "true"
+ SW_EXPORTER_ENABLE_KAFKA_LOG: "true"
+ SW_EXPORTER_KAFKA_SERVERS: broker-a:9092
+ SW_TELEMETRY: prometheus
+ ports:
+ - 1234
+ - 12800
+ depends_on:
+ broker-a:
+ condition: service_healthy
+ networks:
+ - e2e
+
+ provider:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: provider
+ ports:
+ - 9090
+ depends_on:
+ oap:
+ condition: service_healthy
+
+ consumer:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: consumer
+ ports:
+ - 9092
+ depends_on:
+ provider:
+ condition: service_healthy
+networks:
+ e2e:
diff --git a/test/e2e-v2/cases/exporter/kafka/e2e.yaml b/test/e2e-v2/cases/exporter/kafka/e2e.yaml
new file mode 100644
index 0000000000..d7220c4cfa
--- /dev/null
+++ b/test/e2e-v2/cases/exporter/kafka/e2e.yaml
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to show how to write configuration files and can be used to test.
+
+setup:
+ env: compose
+ file: docker-compose.yml
+ timeout: 20m
+ init-system-environment: ../../../script/env
+ steps:
+ - name: set PATH
+ command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH
+ - name: install yq
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
+ - name: install swctl
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
+
+trigger:
+ action: http
+ interval: 3s
+ times: 10
+ url: http://${consumer_host}:${consumer_9092}/users
+ method: POST
+ body: '{"id":"123","name":"skywalking"}'
+ headers:
+ "Content-Type": "application/json"
+
+verify:
+ retry:
+ count: 20
+ interval: 3s
+ cases:
+ - includes:
+ - exporter-cases.yaml
diff --git a/oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/test/e2e-v2/cases/exporter/kafka/expected/result.yml
similarity index 87%
copy from oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
copy to test/e2e-v2/cases/exporter/kafka/expected/result.yml
index 0e65d5bdd0..2f4eac1576 100644
--- a/oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
+++ b/test/e2e-v2/cases/exporter/kafka/expected/result.yml
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -13,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
-#
-org.apache.skywalking.oap.server.exporter.provider.grpc.GRPCExporterProvider
\ No newline at end of file
+# "1" represents all received traces/logs were exported successfully: {exported / received == 1 }
+result: 1
diff --git a/test/e2e-v2/cases/exporter/kafka/exporter-cases.yaml b/test/e2e-v2/cases/exporter/kafka/exporter-cases.yaml
new file mode 100644
index 0000000000..c77185c388
--- /dev/null
+++ b/test/e2e-v2/cases/exporter/kafka/exporter-cases.yaml
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cases:
+ # verify traces
+ - query: |
+ sleep 10;
+ trace_in=$(curl -s http://${oap_host}:${oap_1234} | grep trace_in_latency_count{ | awk '{print $2}'); \
+ trace_out=$(curl -s http://${oap_host}:${oap_1234} | grep kafka_exporter_trace_success_count{ | awk '{print $2}'); \
+ awk 'BEGIN {print '$trace_out/$trace_in'}' | yq e '{"result": .}' -
+ expected: expected/result.yml
+
+ # verify logs
+ - query: |
+ curl -X POST http://${provider_host}:${provider_9090}/logs/trigger > /dev/null;
+ curl -X POST http://${provider_host}:${provider_9090}/logs/trigger > /dev/null;
+ curl -X POST http://${provider_host}:${provider_9090}/logs/trigger > /dev/null;
+ sleep 10;
+ log_in=$(curl -s http://${oap_host}:${oap_1234} | grep log_in_latency_count{ | awk '{print $2}'); \
+ log_out=$(curl -s http://${oap_host}:${oap_1234} | grep kafka_exporter_log_success_count{ | awk '{print $2}'); \
+ awk 'BEGIN {print '$log_out/$log_in'}' | yq e '{"result": .}' -
+ expected: expected/result.yml