You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/02/26 09:18:15 UTC
[skywalking] 01/01: Make Zipkin trace query available
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch remove-zipkin-analysis
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit c503cb54e3ce43b52a4c2f0ec5ed918d9d6deb2e
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Feb 26 17:07:29 2021 +0800
Make Zipkin trace query available
---
CHANGES.md | 7 +-
README.md | 2 +-
docs/en/setup/backend/backend-receivers.md | 67 +---
docs/en/setup/backend/backend-storage.md | 38 +-
docs/en/setup/backend/configuration-vocabulary.md | 3 -
oap-server/pom.xml | 6 -
.../src/main/resources/application.yml | 27 ++
.../server/core/storage/query/ITraceQueryDAO.java | 2 +-
.../jaeger-receiver-plugin/pom.xml | 47 ---
.../server/receiver/jaeger/JaegerGRPCHandler.java | 114 ------
.../receiver/jaeger/JaegerReceiverConfig.java | 34 --
.../receiver/jaeger/JaegerReceiverModule.java | 37 --
.../receiver/jaeger/JaegerReceiverProvider.java | 107 -----
...ywalking.oap.server.library.module.ModuleDefine | 20 -
...alking.oap.server.library.module.ModuleProvider | 19 -
oap-server/server-receiver-plugin/pom.xml | 1 -
.../src/main/proto/jaeger/collector.proto | 66 ----
.../src/main/proto/jaeger/model.proto | 160 --------
.../zipkin-receiver-plugin/pom.xml | 16 +-
.../receiver/zipkin/ZipkinReceiverConfig.java | 3 -
.../receiver/zipkin/ZipkinReceiverProvider.java | 21 +-
.../zipkin/analysis/Receiver2AnalysisBridge.java | 40 --
.../zipkin/analysis/ZipkinSkyWalkingTransfer.java | 33 --
.../zipkin/analysis/cache/CacheFactory.java | 42 --
.../receiver/zipkin/analysis/cache/ISpanCache.java | 25 --
.../analysis/cache/caffeine/CaffeineSpanCache.java | 92 -----
.../zipkin/analysis/data/SkyWalkingTrace.java | 33 --
.../receiver/zipkin/analysis/data/ZipkinTrace.java | 56 ---
.../zipkin/analysis/transform/SegmentBuilder.java | 437 ---------------------
.../zipkin/analysis/transform/SegmentListener.java | 25 --
.../transform/Zipkin2SkyWalkingTransfer.java | 48 ---
.../receiver/zipkin/handler/SpanProcessor.java | 20 +-
.../zipkin/handler/SpanV1JettyHandler.java | 17 +-
.../zipkin/handler/SpanV2JettyHandler.java | 16 +-
.../server/receiver/zipkin/trace/SpanForward.java | 62 +--
.../transform/SpringSleuthSegmentBuilderTest.java | 117 ------
oap-server/server-starter-es7/pom.xml | 5 +
oap-server/server-starter/pom.xml | 10 -
oap-server/server-storage-plugin/pom.xml | 3 +-
.../storage-jaeger-plugin/pom.xml | 42 --
.../server/storage/plugin/jaeger/JaegerSpan.java | 78 ----
.../storage/plugin/jaeger/JaegerSpanRecord.java | 156 --------
.../plugin/jaeger/JaegerSpanRecordDispatcher.java | 48 ---
.../JaegerStorageModuleElasticsearchProvider.java | 45 ---
.../elasticsearch/JaegerTraceQueryEsDAO.java | 309 ---------------
...alking.oap.server.library.module.ModuleProvider | 19 -
.../pom.xml | 12 +-
.../server/storage/plugin/zipkin/ZipkinSpan.java | 5 +
.../storage/plugin/zipkin/ZipkinSpanRecord.java | 8 +
.../plugin/zipkin/ZipkinSpanRecordDispatcher.java | 1 +
.../ZipkinStorageModuleElasticsearchProvider.java | 8 +-
.../elasticsearch/ZipkinTraceQueryEs7DAO.java} | 66 ++--
...alking.oap.server.library.module.ModuleProvider | 0
.../known-oap-backend-dependencies.txt | 1 -
54 files changed, 179 insertions(+), 2497 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index bebd038..8526fd0 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -31,13 +31,18 @@ Release Notes.
* Introduce log analysis language (LAL).
* Fix alarm httpclient connection leak.
* Add `sum` function in meter system.
+* Remove Jaeger receiver.
+* Remove the experimental Zipkin span analyzer.
+* Upgrade the Zipkin Elasticsearch storage from 6 to 7.
+* Require Zipkin receiver must work with `zipkin-elasticsearch7` storage option.
#### UI
* Update selector scroller to show in all pages.
* Implement searching logs with date.
#### Documentation
-
+* Polish documentation due to we have covered all tracing, logging, and metrics fields.
+* Adjust documentation about Zipkin receiver.
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/76?closed=1)
diff --git a/README.md b/README.md
index 168e6e4..b38b276 100644
--- a/README.md
+++ b/README.md
@@ -43,7 +43,7 @@ including
1. Service Mesh Observability. Control panel and data panel.
1. Metrics system, including Prometheus, OpenTelemetry, Spring Sleuth(Micrometer), Zabbix.
1. Logs.
-1. Zipkin v1/v2 and Jaeger gRPC format with limited topology and metrics analysis.(Experimental).
+1. Zipkin v1/v2 trace.(No Analysis)
SkyWalking OAP is using the STAM(Streaming Topology Analysis Method) to analysis topology in the tracing based agent scenario
for better performance. Read [the paper of STAM](https://wu-sheng.github.io/STAM/) for more details.
diff --git a/docs/en/setup/backend/backend-receivers.md b/docs/en/setup/backend/backend-receivers.md
index 3edf3bd..4512aed 100644
--- a/docs/en/setup/backend/backend-receivers.md
+++ b/docs/en/setup/backend/backend-receivers.md
@@ -18,9 +18,8 @@ We have following receivers, and `default` implementors are provided in our Apac
1. **configuration-discovery**. gRPC services handle configurationDiscovery.
1. **receiver-event**. gRPC services to handle events data.
1. **receiver-zabbix**. See [details](backend-zabbix.md).
-1. Experimental receivers. All following receivers are in the POC stage, not production ready.
- 1. **receiver_zipkin**. See [details](#zipkin-receiver). (Experimental)
- 1. **receiver_jaeger**. See [details](#jaeger-receiver). (Experimental)
+1. Experimental receivers.
+ 1. **receiver_zipkin**. See [details](#zipkin-receiver).
The sample settings of these receivers should be already in default `application.yml`, and also list here
```yaml
@@ -147,18 +146,11 @@ receiver-meter:
default:
```
-## Experimental receivers
-All following receivers are in the POC stage, not production ready.
-
-### Zipkin receiver
-Zipkin receiver could work in two different mode.
-1. Tracing mode(default). Tracing mode is that, skywalking OAP acts like zipkin collector,
- fully supports Zipkin v1/v2 formats through HTTP service,
- also provide persistence and query in skywalking UI.
- But it wouldn't analysis metrics from them. In most case, I suggest you could use this feature, when metrics come from service mesh.
- Notice, in this mode, Zipkin receiver requires `zipkin-elasticsearch` storage implementation active.
- Read [this](backend-storage.md#elasticsearch-6-with-zipkin-trace-extension) to know
- how to active.
+## Zipkin receiver
+Zipkin receiver makes the OAP server as an alternative Zipkin server implementation. It supports Zipkin v1/v2 formats through HTTP service.
+Make sure you use this with `SW_STORAGE=zipkin-elasticsearch7` option to activate Zipkin storage implementation.
+Once this receiver and storage activated, SkyWalking native traces would be ignored, and SkyWalking wouldn't analysis topology, metrics, endpoint
+dependency from Zipkin's trace.
Use following config to active.
```yaml
@@ -175,45 +167,6 @@ receiver_zipkin:
jettyAcceptQueueSize: ${SW_RECEIVER_ZIPKIN_QUEUE_SIZE:0}
```
-2. Analysis mode(Not production ready), receive Zipkin v1/v2 formats through HTTP service. Transform the trace to skywalking
- native format, and analysis like skywalking trace. This feature can't work in production env right now,
- because of Zipkin tag/endpoint value unpredictable, we can't make sure it fits production env requirements.
-
-Active `analysis mode`, you should set `needAnalysis` config.
-```yaml
-receiver_zipkin:
- selector: ${SW_RECEIVER_ZIPKIN:-}
- default:
- host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0}
- port: ${SW_RECEIVER_ZIPKIN_PORT:9411}
- contextPath: ${SW_RECEIVER_ZIPKIN_CONTEXT_PATH:/}
- jettyMinThreads: ${SW_RECEIVER_ZIPKIN_JETTY_MIN_THREADS:1}
- jettyMaxThreads: ${SW_RECEIVER_ZIPKIN_JETTY_MAX_THREADS:200}
- jettyIdleTimeOut: ${SW_RECEIVER_ZIPKIN_JETTY_IDLE_TIMEOUT:30000}
- jettyAcceptorPriorityDelta: ${SW_RECEIVER_ZIPKIN_JETTY_DELTA:0}
- jettyAcceptQueueSize: ${SW_RECEIVER_ZIPKIN_QUEUE_SIZE:0}
- needAnalysis: true
-```
-
-NOTICE, Zipkin receiver is only provided in `apache-skywalking-apm-x.y.z.tar.gz` tar.
-
-### Jaeger receiver
-Jaeger receiver right now only works in `Tracing Mode`, and no analysis.
-Jaeger receiver provides extra gRPC host/port, if absent, sharing-server host/port will be used, then core gRPC host/port.
-Receiver requires `jaeger-elasticsearch` storage implementation active.
-Read [this](backend-storage.md#elasticsearch-6-with-jaeger-trace-extension) to know how to active.
-
-Right now, you need [jaeger agent](https://www.jaegertracing.io/docs/1.11/architecture/#agent) to batch
-send spans to SkyWalking oap server. Read [Jaeger Architecture](https://www.jaegertracing.io/docs/1.11/architecture/)
-to get more details.
-
-Active the receiver.
-```yaml
-receiver_jaeger:
- selector: ${SW_RECEIVER_JAEGER:-}
- default:
- gRPCHost: ${SW_RECEIVER_JAEGER_HOST:0.0.0.0}
- gRPCPort: ${SW_RECEIVER_JAEGER_PORT:14250}
-```
-
-NOTICE, Jaeger receiver is only provided in `apache-skywalking-apm-x.y.z.tar.gz` tar.
+NOTICE, Zipkin receiver is only provided in `apache-skywalking-apm-es7-x.y.z.tar.gz` tar.
+And this requires `zipkin-elasticsearch7` storage implementation active.
+Read [this](backend-storage.md#elasticsearch-7-with-zipkin-trace-extension) doc to know Zipkin as storage option.
diff --git a/docs/en/setup/backend/backend-storage.md b/docs/en/setup/backend/backend-storage.md
index da2acb0..e3f83a7 100644
--- a/docs/en/setup/backend/backend-storage.md
+++ b/docs/en/setup/backend/backend-storage.md
@@ -15,9 +15,6 @@ Native supported storage
- InfluxDB
- PostgreSQL
-Redistribution version with supported storage.
-- ElasticSearch 5
-
## H2
Active H2 as storage, set storage provider to **H2** In-Memory Databases. Default in distribution package.
@@ -155,13 +152,13 @@ We strongly advice you to read more about these configurations from ElasticSearc
This effects the performance of ElasticSearch very much.
-### ElasticSearch 6 with Zipkin trace extension
-This implementation shares most of `elasticsearch`, just extend to support zipkin span storage.
+### ElasticSearch 7 with Zipkin trace extension
+This implementation shares most of `elasticsearch7`, just extends to support zipkin span storage.
It has all same configs.
```yaml
storage:
- selector: ${SW_STORAGE:zipkin-elasticsearch}
- zipkin-elasticsearch:
+ selector: ${SW_STORAGE:zipkin-elasticsearch7}
+ zipkin-elasticsearch7:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
@@ -176,32 +173,9 @@ storage:
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
```
-### ElasticSearch 6 with Jaeger trace extension
-This implementation shares most of `elasticsearch`, just extend to support jaeger span storage.
-It has all same configs.
-```yaml
-storage:
- selector: ${SW_STORAGE:jaeger-elasticsearch}
- jaeger-elasticsearch:
- nameSpace: ${SW_NAMESPACE:""}
- clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
- protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
- user: ${SW_ES_USER:""}
- password: ${SW_ES_PASSWORD:""}
- indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
- indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
- # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
- bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
- bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
- flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
- concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
-```
-
-
### About Namespace
When namespace is set, names of all indexes in ElasticSearch will use it as prefix.
-
## MySQL
Active MySQL as storage, set storage provider to **mysql**.
@@ -289,10 +263,6 @@ storage:
All connection related settings including link url, username and password are in `application.yml`.
Here are some of the settings, please follow [HikariCP](https://github.com/brettwooldridge/HikariCP) connection pool document for all the settings.
-## ElasticSearch 5
-ElasticSearch 5 is incompatible with ElasticSearch 6 Java client jar, so it could not be included in native distribution.
-[OpenSkyWalking/SkyWalking-With-Es5x-Storage](https://github.com/OpenSkywalking/SkyWalking-With-Es5x-Storage) repo includes the distribution version.
-
## More storage solution extension
Follow [Storage extension development guide](../../guides/storage-extention.md)
in [Project Extensions document](../../guides/README.md#project-extensions) in development guide.
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index 428bad3..1e2b2d5 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -194,9 +194,6 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | restHost| Binding IP of restful service. |SW_RECEIVER_ZIPKIN_HOST|0.0.0.0|
| - | - | restPort | Binding port of restful service | SW_RECEIVER_ZIPKIN_PORT|9411|
| - | - | restContextPath| Web context path of restful service| SW_RECEIVER_ZIPKIN_CONTEXT_PATH|/|
-| - | - | needAnalysis|Analysis zipkin span to generate metrics| - | false|
-| - | - | maxCacheSize| Max cache size for span analysis | - | 1_000_000 |
-| - | - | expireTime| The expire time of analysis cache, unit is second. | - | 20|
| receiver_jaeger | default| Read [receiver doc](backend-receivers.md) | - | - |
| - | - | gRPCHost|Binding IP of gRPC service. Services include gRPC data report and internal communication among OAP nodes| SW_RECEIVER_JAEGER_HOST | - |
| - | - | gRPCPort| Binding port of gRPC service | SW_RECEIVER_JAEGER_PORT | - |
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 15d76ca..1fcb42f 100755
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -71,7 +71,6 @@
<kubernetes.version>10.0.0</kubernetes.version>
<hikaricp.version>3.1.0</hikaricp.version>
<zipkin.version>2.9.1</zipkin.version>
- <caffeine.version>2.6.2</caffeine.version>
<okhttp.version>3.9.0</okhttp.version>
<jackson-core.version>2.9.5</jackson-core.version>
<jackson-annotations.version>2.9.5</jackson-annotations.version>
@@ -363,11 +362,6 @@
<artifactId>zipkin</artifactId>
<version>${zipkin.version}</version>
</dependency>
- <dependency>
- <groupId>com.github.ben-manes.caffeine</groupId>
- <artifactId>caffeine</artifactId>
- <version>${caffeine.version}</version>
- </dependency>
<!-- -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index 6ff73d9..91ea490 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -213,6 +213,33 @@ storage:
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+ zipkin-elasticsearch7:
+ nameSpace: ${SW_NAMESPACE:""}
+ clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
+ protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
+ trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""}
+ trustStorePass: ${SW_STORAGE_ES_SSL_JKS_PASS:""}
+ dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
+ indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1} # Shard number of new indexes
+ indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:1} # Replicas number of new indexes
+ # Super data set has been defined in the codes, such as trace segments.The following 3 config would be improve es performance when storage super size data in es.
+ superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0
+ superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.
+ superDatasetIndexReplicasNumber: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas number in the super size dataset record index, the default value is 0.
+ user: ${SW_ES_USER:""}
+ password: ${SW_ES_PASSWORD:""}
+ secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
+ bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests
+ syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests
+ flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
+ concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
+ resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
+ metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
+ segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200}
+ profileTaskQueryMaxSize: ${SW_STORAGE_ES_QUERY_PROFILE_TASK_SIZE:200}
+ oapAnalyzer: ${SW_STORAGE_ES_OAP_ANALYZER:"{\"analyzer\":{\"oap_analyzer\":{\"type\":\"stop\"}}}"} # the oap analyzer.
+ oapLogAnalyzer: ${SW_STORAGE_ES_OAP_LOG_ANALYZER:"{\"analyzer\":{\"oap_log_analyzer\":{\"type\":\"standard\"}}}"} # the oap log analyzer. It could be customized by the ES analyzer configuration to support more language log formats, such as Chinese log, Japanese log and etc.
+ advanced: ${SW_STORAGE_ES_ADVANCED:""}
agent-analyzer:
selector: ${SW_AGENT_ANALYZER:default}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
index 0af07d3..8524d2a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
@@ -48,7 +48,7 @@ public interface ITraceQueryDAO extends Service {
List<SegmentRecord> queryByTraceId(String traceId) throws IOException;
/**
- * This method gives more flexible for unnative
+ * This method gives more flexible for 3rd trace without segment concept, which can't search data through {@link #queryByTraceId(String)}
*/
List<Span> doFlexibleTraceQuery(String traceId) throws IOException;
}
diff --git a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/pom.xml
deleted file mode 100644
index d217616..0000000
--- a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/pom.xml
+++ /dev/null
@@ -1,47 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- ~
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>server-receiver-plugin</artifactId>
- <groupId>org.apache.skywalking</groupId>
- <version>8.5.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>jaeger-receiver-plugin</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>receiver-proto</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>skywalking-sharing-server-plugin</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>storage-jaeger-plugin</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
-</project>
\ No newline at end of file
diff --git a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java
deleted file mode 100644
index c17478d..0000000
--- a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.aop.server.receiver.jaeger;
-
-import com.google.protobuf.ByteString;
-import io.grpc.stub.StreamObserver;
-import io.jaegertracing.api_v2.Collector;
-import io.jaegertracing.api_v2.CollectorServiceGrpc;
-import io.jaegertracing.api_v2.Model;
-import java.time.Instant;
-import java.util.Base64;
-import org.apache.skywalking.apm.util.StringUtil;
-import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.NodeType;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
-import org.apache.skywalking.oap.server.library.util.BooleanUtils;
-import org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpan;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JaegerGRPCHandler extends CollectorServiceGrpc.CollectorServiceImplBase {
- private static final Logger LOGGER = LoggerFactory.getLogger(JaegerGRPCHandler.class);
-
- private SourceReceiver receiver;
- private JaegerReceiverConfig config;
-
- public JaegerGRPCHandler(SourceReceiver receiver, JaegerReceiverConfig config) {
- this.receiver = receiver;
- this.config = config;
- }
-
- @Override
- public void postSpans(Collector.PostSpansRequest request,
- StreamObserver<Collector.PostSpansResponse> responseObserver) {
-
- request.getBatch().getSpansList().forEach(span -> {
- try {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(span.toString());
- }
-
- JaegerSpan jaegerSpan = new JaegerSpan();
- jaegerSpan.setTraceId(format(span.getTraceId()));
- jaegerSpan.setSpanId(format(span.getSpanId()));
- Model.Process process = span.getProcess();
- String serviceName = null;
- if (process != null) {
- serviceName = process.getServiceName();
- }
- if (StringUtil.isEmpty(serviceName)) {
- serviceName = "UNKNOWN";
- }
- final String serviceId = IDManager.ServiceID.buildId(serviceName, NodeType.Normal);
-
- long duration = span.getDuration().getNanos() / 1_000_000;
- jaegerSpan.setStartTime(Instant.ofEpochSecond(
- span.getStartTime().getSeconds(), span.getStartTime().getNanos()).toEpochMilli());
- long timeBucket = TimeBucket.getRecordTimeBucket(jaegerSpan.getStartTime());
- jaegerSpan.setTimeBucket(timeBucket);
- jaegerSpan.setEndTime(jaegerSpan.getStartTime() + duration);
- jaegerSpan.setLatency((int) duration);
- jaegerSpan.setDataBinary(span.toByteArray());
- jaegerSpan.setEndpointName(span.getOperationName());
- jaegerSpan.setServiceId(serviceId);
-
- span.getTagsList().forEach(tag -> {
- String key = tag.getKey();
- if ("error".equals(key)) {
- boolean status = tag.getVBool();
- jaegerSpan.setIsError(BooleanUtils.booleanToValue(status));
- } else if ("span.kind".equals(key)) {
- String kind = tag.getVStr();
- if ("server".equals(kind) || "consumer".equals(kind)) {
- String endpointName = span.getOperationName();
- jaegerSpan.setEndpointName(endpointName);
- jaegerSpan.setEndpointId(
- IDManager.EndpointID.buildId(serviceId, endpointName));
- }
- }
- });
-
- receiver.receive(jaegerSpan);
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- });
-
- responseObserver.onNext(Collector.PostSpansResponse.newBuilder().build());
- responseObserver.onCompleted();
- }
-
- private String format(ByteString bytes) {
- Base64.Encoder encoder = Base64.getEncoder();
- return encoder.encodeToString(bytes.toByteArray());
- }
-
-}
diff --git a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverConfig.java b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverConfig.java
deleted file mode 100644
index 55d8f67..0000000
--- a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverConfig.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.aop.server.receiver.jaeger;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-
-@Setter
-@Getter
-public class JaegerReceiverConfig extends ModuleConfig {
- private String gRPCHost = null;
- private int gRPCPort = -1;
- private int maxConcurrentCallsPerConnection;
- private int maxMessageSize;
- private int gRPCThreadPoolSize;
- private int gRPCThreadPoolQueueSize;
-}
diff --git a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverModule.java b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverModule.java
deleted file mode 100644
index 3eebf9a..0000000
--- a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverModule.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.aop.server.receiver.jaeger;
-
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-
-/**
- * Adapt Jaeger gRPC backend service.
- */
-public class JaegerReceiverModule extends ModuleDefine {
- public static final String NAME = "receiver_jaeger";
-
- public JaegerReceiverModule() {
- super(NAME);
- }
-
- @Override
- public Class[] services() {
- return new Class[0];
- }
-}
diff --git a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverProvider.java b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverProvider.java
deleted file mode 100644
index 934bd91..0000000
--- a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverProvider.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.aop.server.receiver.jaeger;
-
-import java.util.Objects;
-import org.apache.logging.log4j.util.Strings;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.apache.skywalking.oap.server.library.server.ServerException;
-import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
-import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
-
-public class JaegerReceiverProvider extends ModuleProvider {
- public static final String NAME = "default";
- private JaegerReceiverConfig config;
- private GRPCServer grpcServer = null;
-
- @Override
- public String name() {
- return NAME;
- }
-
- @Override
- public Class<? extends ModuleDefine> module() {
- return JaegerReceiverModule.class;
- }
-
- @Override
- public ModuleConfig createConfigBeanIfAbsent() {
- config = new JaegerReceiverConfig();
- return config;
- }
-
- @Override
- public void prepare() throws ServiceNotProvidedException, ModuleStartException {
- if (config.getGRPCPort() > 0) {
- grpcServer = new GRPCServer(Strings.isBlank(config.getGRPCHost()) ? "0.0.0.0" : config.getGRPCHost(), config
- .getGRPCPort());
- if (config.getMaxMessageSize() > 0) {
- grpcServer.setMaxMessageSize(config.getMaxMessageSize());
- }
- if (config.getMaxConcurrentCallsPerConnection() > 0) {
- grpcServer.setMaxConcurrentCallsPerConnection(config.getMaxConcurrentCallsPerConnection());
- }
- if (config.getGRPCThreadPoolQueueSize() > 0) {
- grpcServer.setThreadPoolQueueSize(config.getGRPCThreadPoolQueueSize());
- }
- if (config.getGRPCThreadPoolSize() > 0) {
- grpcServer.setThreadPoolSize(config.getGRPCThreadPoolSize());
- }
- grpcServer.initialize();
- }
- }
-
- @Override
- public void start() throws ServiceNotProvidedException, ModuleStartException {
- SourceReceiver sourceReceiver = getManager().find(CoreModule.NAME).provider().getService(SourceReceiver.class);
-
- if (Objects.nonNull(grpcServer)) {
- grpcServer.addHandler(new JaegerGRPCHandler(sourceReceiver, config));
- } else {
- GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME)
- .provider()
- .getService(GRPCHandlerRegister.class);
- grpcHandlerRegister.addHandler(new JaegerGRPCHandler(sourceReceiver, config));
- }
-
- }
-
- @Override
- public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
- try {
- if (Objects.nonNull(grpcServer)) {
- grpcServer.start();
- }
- } catch (ServerException e) {
- throw new ModuleStartException(e.getMessage(), e);
- }
- }
-
- @Override
- public String[] requiredModules() {
- return new String[] {SharingServerModule.NAME};
- }
-}
diff --git a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine
deleted file mode 100644
index b7c7659..0000000
--- a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-
-
-org.apache.skywalking.aop.server.receiver.jaeger.JaegerReceiverModule
diff --git a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
deleted file mode 100644
index 0faf17a..0000000
--- a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-
-org.apache.skywalking.aop.server.receiver.jaeger.JaegerReceiverProvider
diff --git a/oap-server/server-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/pom.xml
index c9ceeb5..feb2b3e 100644
--- a/oap-server/server-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/pom.xml
@@ -36,7 +36,6 @@
<module>envoy-metrics-receiver-plugin</module>
<module>skywalking-sharing-server-plugin</module>
<module>skywalking-clr-receiver-plugin</module>
- <module>jaeger-receiver-plugin</module>
<module>receiver-proto</module>
<module>skywalking-profile-receiver-plugin</module>
<module>otel-receiver-plugin</module>
diff --git a/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/jaeger/collector.proto b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/jaeger/collector.proto
deleted file mode 100755
index 3ad86c2..0000000
--- a/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/jaeger/collector.proto
+++ /dev/null
@@ -1,66 +0,0 @@
-// Copyright (c) 2019 The Jaeger Authors.
-// Copyright (c) 2018 Uber Technologies, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax="proto3";
-
-package jaeger.api_v2;
-
-import "jaeger/model.proto";
-import "gogoproto/gogo.proto";
-import "google/api/annotations.proto";
-import "protoc-gen-swagger/options/annotations.proto";
-
-option go_package = "api_v2";
-option java_package = "io.jaegertracing.api_v2";
-
-// Enable gogoprotobuf extensions (https://github.com/gogo/protobuf/blob/master/extensions.md).
-// Enable custom Marshal method.
-option (gogoproto.marshaler_all) = true;
-// Enable custom Unmarshal method.
-option (gogoproto.unmarshaler_all) = true;
-// Enable custom Size method (Required by Marshal and Unmarshal).
-option (gogoproto.sizer_all) = true;
-// Enable registration with golang/protobuf for the grpc-gateway.
-option (gogoproto.goproto_registration) = true;
-
-option (grpc.gateway.protoc_gen_swagger.options.openapiv2_swagger) = {
- info: {
- version: "1.0";
- };
- external_docs: {
- url: "https://github.com/jaegertracing/jaeger";
- description: "Jaeger API";
- }
- schemes: HTTP;
- schemes: HTTPS;
-};
-
-message PostSpansRequest {
- Batch batch = 1 [
- (gogoproto.nullable) = false
- ];
-}
-
-message PostSpansResponse {
-}
-
-service CollectorService {
- rpc PostSpans(PostSpansRequest) returns (PostSpansResponse) {
- option (google.api.http) = {
- post: "/api/v2/spans"
- body: "*"
- };
- }
-}
diff --git a/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/jaeger/model.proto b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/jaeger/model.proto
deleted file mode 100755
index 3bce8a1..0000000
--- a/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/jaeger/model.proto
+++ /dev/null
@@ -1,160 +0,0 @@
-// Copyright (c) 2018 Uber Technologies, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax="proto3";
-
-package jaeger.api_v2;
-
-import "gogoproto/gogo.proto";
-import "google/api/annotations.proto";
-import "google/protobuf/timestamp.proto";
-import "google/protobuf/duration.proto";
-
-// TODO: document all types and fields
-
-// TODO: once this moves to jaeger-idl repo, we may want to change Go pkg to api_v2
-// and rewrite it to model only in this repo. That should make it easier to generate
-// classes in other languages.
-option go_package = "model";
-option java_package = "io.jaegertracing.api_v2";
-
-// Enable gogoprotobuf extensions (https://github.com/gogo/protobuf/blob/master/extensions.md).
-// Enable custom Marshal method.
-option (gogoproto.marshaler_all) = true;
-// Enable custom Unmarshal method.
-option (gogoproto.unmarshaler_all) = true;
-// Enable custom Size method (Required by Marshal and Unmarshal).
-option (gogoproto.sizer_all) = true;
-// Enable registration with golang/protobuf for the grpc-gateway.
-option (gogoproto.goproto_registration) = true;
-
-enum ValueType {
- STRING = 0;
- BOOL = 1;
- INT64 = 2;
- FLOAT64 = 3;
- BINARY = 4;
-};
-
-message KeyValue {
- option (gogoproto.equal) = true;
- option (gogoproto.compare) = true;
-
- string key = 1;
- ValueType v_type = 2;
- string v_str = 3;
- bool v_bool = 4;
- int64 v_int64 = 5;
- double v_float64 = 6;
- bytes v_binary = 7;
-}
-
-message Log {
- google.protobuf.Timestamp timestamp = 1 [
- (gogoproto.stdtime) = true,
- (gogoproto.nullable) = false
- ];
- repeated KeyValue fields = 2 [
- (gogoproto.nullable) = false
- ];
-}
-
-enum SpanRefType {
- CHILD_OF = 0;
- FOLLOWS_FROM = 1;
-};
-
-message SpanRef {
- bytes trace_id = 1 [
- (gogoproto.nullable) = false,
- (gogoproto.customtype) = "TraceID",
- (gogoproto.customname) = "TraceID"
- ];
- bytes span_id = 2 [
- (gogoproto.nullable) = false,
- (gogoproto.customtype) = "SpanID",
- (gogoproto.customname) = "SpanID"
- ];
- SpanRefType ref_type = 3;
-}
-
-message Process {
- string service_name = 1;
- repeated KeyValue tags = 2 [
- (gogoproto.nullable) = false
- ];
-}
-
-message Span {
- bytes trace_id = 1 [
- (gogoproto.nullable) = false,
- (gogoproto.customtype) = "TraceID",
- (gogoproto.customname) = "TraceID"
- ];
- bytes span_id = 2 [
- (gogoproto.nullable) = false,
- (gogoproto.customtype) = "SpanID",
- (gogoproto.customname) = "SpanID"
- ];
- string operation_name = 3;
- repeated SpanRef references = 4 [
- (gogoproto.nullable) = false
- ];
- uint32 flags = 5 [
- (gogoproto.nullable) = false,
- (gogoproto.customtype) = "Flags"
- ];
- google.protobuf.Timestamp start_time = 6 [
- (gogoproto.stdtime) = true,
- (gogoproto.nullable) = false
- ];
- google.protobuf.Duration duration = 7 [
- (gogoproto.stdduration) = true,
- (gogoproto.nullable) = false
- ];
- repeated KeyValue tags = 8 [
- (gogoproto.nullable) = false
- ];
- repeated Log logs = 9 [
- (gogoproto.nullable) = false
- ];
- Process process = 10;
- string process_id = 11 [
- (gogoproto.customname) = "ProcessID"
- ];
- repeated string warnings = 12;
-}
-
-message Trace {
- message ProcessMapping {
- string process_id = 1 [
- (gogoproto.customname) = "ProcessID"
- ];
- Process process = 2 [
- (gogoproto.nullable) = false
- ];
- }
- repeated Span spans = 1;
- repeated ProcessMapping process_map = 2 [
- (gogoproto.nullable) = false
- ];
- repeated string warnings = 3;
-}
-
-message Batch {
- repeated Span spans = 1;
- Process process = 2 [
- (gogoproto.nullable) = true
- ];
-}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
index 968e04b..b34dc1e 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml
@@ -31,24 +31,10 @@
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
- <artifactId>skywalking-trace-receiver-plugin</artifactId>
+ <artifactId>storage-zipkin-elasticsearch7-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>storage-zipkin-plugin</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>skywalking-management-receiver-plugin</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.github.ben-manes.caffeine</groupId>
- <artifactId>caffeine</artifactId>
- </dependency>
- <dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin</artifactId>
</dependency>
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
index f85e709..ded3932 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java
@@ -33,7 +33,4 @@ public class ZipkinReceiverConfig extends ModuleConfig {
private long jettyIdleTimeOut = 30000;
private int jettyAcceptorPriorityDelta = 0;
private int jettyAcceptQueueSize = 0;
- private int expireTime = 20;
- private int maxCacheSize = 1_000_000;
- private boolean needAnalysis = false;
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
index 9e8269b..81e6d28 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java
@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.receiver.zipkin;
-import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.ISegmentParserService;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
@@ -28,9 +27,6 @@ import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedExcepti
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServerConfig;
-import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.Receiver2AnalysisBridge;
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV1JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV2JettyHandler;
@@ -82,14 +78,6 @@ public class ZipkinReceiverProvider extends ModuleProvider {
jettyServer.addHandler(new SpanV1JettyHandler(config, getManager()));
jettyServer.addHandler(new SpanV2JettyHandler(config, getManager()));
-
- if (config.isNeedAnalysis()) {
- ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME)
- .provider()
- .getService(ISegmentParserService.class);
- Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService);
- Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge);
- }
}
@Override
@@ -103,13 +91,6 @@ public class ZipkinReceiverProvider extends ModuleProvider {
@Override
public String[] requiredModules() {
- if (config.isNeedAnalysis()) {
- return new String[] {TraceModule.NAME};
- } else {
- /**
- * In pure trace status, we don't need the trace receiver.
- */
- return new String[] {CoreModule.NAME};
- }
+ return new String[] {CoreModule.NAME};
}
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/Receiver2AnalysisBridge.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/Receiver2AnalysisBridge.java
deleted file mode 100644
index e9300ba..0000000
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/Receiver2AnalysisBridge.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.receiver.zipkin.analysis;
-
-import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.ISegmentParserService;
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.SegmentListener;
-
-/**
- * Send the segments to Analysis module, like receiving segments from native SkyWalking agents.
- */
-public class Receiver2AnalysisBridge implements SegmentListener {
- private ISegmentParserService segmentParseService;
-
- public Receiver2AnalysisBridge(ISegmentParserService segmentParseService) {
- this.segmentParseService = segmentParseService;
- }
-
- @Override
- public void notify(SkyWalkingTrace trace) {
- trace.getSegmentList().forEach(upstream -> segmentParseService.send(upstream.build()));
-
- }
-}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java
deleted file mode 100644
index a44d887..0000000
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.receiver.zipkin.analysis;
-
-import java.util.List;
-import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.CacheFactory;
-import zipkin2.Span;
-
-public class ZipkinSkyWalkingTransfer {
- public void doTransfer(ZipkinReceiverConfig config, List<Span> spanList) {
- spanList.forEach(span -> {
- // In Zipkin, the local service name represents the application owner.
- CacheFactory.INSTANCE.get(config).addSpan(span);
- });
- }
-}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/CacheFactory.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/CacheFactory.java
deleted file mode 100644
index bffa458..0000000
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/CacheFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache;
-
-import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.caffeine.CaffeineSpanCache;
-
-public class CacheFactory {
- public static final CacheFactory INSTANCE = new CacheFactory();
-
- private ISpanCache implementor;
-
- private CacheFactory() {
- }
-
- public ISpanCache get(ZipkinReceiverConfig config) {
- if (implementor == null) {
- synchronized (INSTANCE) {
- if (implementor == null) {
- implementor = new CaffeineSpanCache(config);
- }
- }
- }
- return implementor;
- }
-}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/ISpanCache.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/ISpanCache.java
deleted file mode 100644
index b4f9d4f..0000000
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/ISpanCache.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache;
-
-import zipkin2.Span;
-
-public interface ISpanCache {
- void addSpan(Span span);
-}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/caffeine/CaffeineSpanCache.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/caffeine/CaffeineSpanCache.java
deleted file mode 100644
index f5129e3..0000000
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/caffeine/CaffeineSpanCache.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.caffeine;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.RemovalCause;
-import com.github.benmanes.caffeine.cache.RemovalListener;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.ISpanCache;
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace;
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import zipkin2.Span;
-
-/**
- * NOTICE: FROM my test, Caffeine cache triggers/checks expire only face write/read op. In order to make trace finish in
- * time, I have to set a timer to write a meaningless trace, for active expire.
- */
-public class CaffeineSpanCache implements ISpanCache, RemovalListener<String, ZipkinTrace> {
- private static final Logger LOGGER = LoggerFactory.getLogger(CaffeineSpanCache.class);
- private Cache<String, ZipkinTrace> inProcessSpanCache;
- private ReentrantLock newTraceLock;
-
- public CaffeineSpanCache(ZipkinReceiverConfig config) {
- newTraceLock = new ReentrantLock();
- inProcessSpanCache = Caffeine.newBuilder()
- .expireAfterWrite(config.getExpireTime(), TimeUnit.SECONDS)
- .maximumSize(config.getMaxCacheSize())
- .removalListener(this)
- .build();
- Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
- inProcessSpanCache.put("ACTIVE", new ZipkinTrace.TriggerTrace());
- }, 2, 3, TimeUnit.SECONDS);
- }
-
- /**
- * Zipkin trace finished by the expired rule.
- */
- @Override
- public void onRemoval(@Nullable String key, @Nullable ZipkinTrace trace, @Nonnull RemovalCause cause) {
- if (trace instanceof ZipkinTrace.TriggerTrace) {
- return;
- }
- try {
- Zipkin2SkyWalkingTransfer.INSTANCE.transfer(trace);
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- LOGGER.warn("Zipkin trace:" + trace);
- }
- }
-
- @Override
- public void addSpan(Span span) {
- ZipkinTrace trace = inProcessSpanCache.getIfPresent(span.traceId());
- if (trace == null) {
- newTraceLock.lock();
- try {
- trace = inProcessSpanCache.getIfPresent(span.traceId());
- if (trace == null) {
- trace = new ZipkinTrace();
- inProcessSpanCache.put(span.traceId(), trace);
- }
- } finally {
- newTraceLock.unlock();
- }
- }
- trace.addSpan(span);
- }
-}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/SkyWalkingTrace.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/SkyWalkingTrace.java
deleted file mode 100644
index c384dd6..0000000
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/SkyWalkingTrace.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.receiver.zipkin.analysis.data;
-
-import java.util.List;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
-
-/**
- * Each SkyWalkingTrace consists of segments in each application, original from {@link ZipkinTrace}s
- */
-@RequiredArgsConstructor
-@Getter
-public class SkyWalkingTrace {
- private final List<SegmentObject.Builder> segmentList;
-}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/ZipkinTrace.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/ZipkinTrace.java
deleted file mode 100644
index 89df070..0000000
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/ZipkinTrace.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.receiver.zipkin.analysis.data;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.locks.ReentrantLock;
-import zipkin2.Span;
-
-public class ZipkinTrace {
- private List<Span> spans;
- private ReentrantLock spanWriteLock;
-
- public ZipkinTrace() {
- spans = new LinkedList<>();
- spanWriteLock = new ReentrantLock();
- }
-
- public void addSpan(Span span) {
- spanWriteLock.lock();
- try {
- spans.add(span);
- } finally {
- spanWriteLock.unlock();
- }
- }
-
- public List<Span> getSpans() {
- return spans;
- }
-
- @Override
- public String toString() {
- return "ZipkinTrace{" + "spans=" + spans + '}';
- }
-
- public static class TriggerTrace extends ZipkinTrace {
-
- }
-}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentBuilder.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentBuilder.java
deleted file mode 100644
index 8f8ad3c..0000000
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentBuilder.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
-
-import com.google.common.base.Strings;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
-import org.apache.skywalking.apm.network.language.agent.v3.Log;
-import org.apache.skywalking.apm.network.language.agent.v3.RefType;
-import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
-import org.apache.skywalking.apm.network.language.agent.v3.SegmentReference;
-import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
-import org.apache.skywalking.apm.network.language.agent.v3.SpanType;
-import org.apache.skywalking.apm.util.StringUtil;
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
-import zipkin2.Endpoint;
-import zipkin2.Span;
-
-public class SegmentBuilder {
- private Context context;
- private LinkedList<Segment> segments;
- private Map<String, ClientSideSpan> clientPartSpan;
-
- private SegmentBuilder() {
- segments = new LinkedList<>();
- context = new Context();
- clientPartSpan = new HashMap<>();
- }
-
- public static SkyWalkingTrace build(List<Span> traceSpans) throws Exception {
- SegmentBuilder builder = new SegmentBuilder();
- // This map groups the spans by their parent id, in order to assist to build tree.
- // key: parentId
- // value: span
- Map<String, List<Span>> childSpanMap = new HashMap<>();
- AtomicReference<Span> root = new AtomicReference<>();
- traceSpans.forEach(span -> {
- if (span.parentId() == null) {
- root.set(span);
- }
- List<Span> spanList = childSpanMap.get(span.parentId());
- if (spanList == null) {
- spanList = new LinkedList<>();
- spanList.add(span);
- childSpanMap.put(span.parentId(), spanList);
- } else {
- spanList.add(span);
- }
- });
-
- Span rootSpan = root.get();
- long timestamp = 0;
- if (rootSpan != null) {
- String applicationCode = rootSpan.localServiceName();
- // If root span doesn't include applicationCode, a.k.a local service name,
- // Segment can't be built
- // Ignore the whole trace.
- // :P Hope anyone could provide better solution.
- // Wu Sheng.
- if (!Strings.isNullOrEmpty(applicationCode)) {
- timestamp = rootSpan.timestampAsLong();
- builder.context.addService(applicationCode);
-
- SpanObject.Builder rootSpanBuilder = builder.initSpan(null, null, rootSpan, true);
- builder.context.currentSegment().addSpan(rootSpanBuilder);
- builder.scanSpansFromRoot(rootSpanBuilder, rootSpan, childSpanMap);
-
- builder.segments.add(builder.context.removeApp());
- }
- }
-
- List<SegmentObject.Builder> segmentBuilders = new LinkedList<>();
- // microseconds -> million seconds
- long finalTimestamp = timestamp / 1000;
- builder.segments.forEach(segment -> {
- SegmentObject.Builder traceSegmentBuilder = segment.freeze();
- segmentBuilders.add(traceSegmentBuilder);
- });
- return new SkyWalkingTrace(segmentBuilders);
- }
-
- private void scanSpansFromRoot(SpanObject.Builder parentSegmentSpan, Span parent,
- Map<String, List<Span>> childSpanMap) throws Exception {
- String parentId = parent.id();
- // get child spans by parent span id
- List<Span> spanList = childSpanMap.get(parentId);
- if (spanList == null) {
- return;
- }
- for (Span childSpan : spanList) {
- String localServiceName = childSpan.localServiceName();
- boolean isNewApp = false;
- if (StringUtil.isNotEmpty(localServiceName)) {
- if (context.isServiceChanged(localServiceName)) {
- isNewApp = true;
- }
- }
-
- try {
- if (isNewApp) {
- context.addService(localServiceName);
- }
- SpanObject.Builder childSpanBuilder = initSpan(parentSegmentSpan, parent, childSpan, isNewApp);
-
- context.currentSegment().addSpan(childSpanBuilder);
- scanSpansFromRoot(childSpanBuilder, childSpan, childSpanMap);
-
- } finally {
- if (isNewApp) {
- segments.add(context.removeApp());
- }
- }
- }
- }
-
- private SpanObject.Builder initSpan(SpanObject.Builder parentSegmentSpan, Span parentSpan, Span span,
- boolean isSegmentRoot) {
- SpanObject.Builder spanBuilder = SpanObject.newBuilder();
- spanBuilder.setSpanId(context.currentIDs().nextSpanId());
- if (isSegmentRoot) {
- // spanId = -1, means no parent span
- // spanId is considered unique, and from a positive sequence in each segment.
- spanBuilder.setParentSpanId(-1);
- }
- if (!isSegmentRoot && parentSegmentSpan != null) {
- spanBuilder.setParentSpanId(parentSegmentSpan.getSpanId());
- }
- Span.Kind kind = span.kind();
- String opName = Strings.isNullOrEmpty(span.name()) ? "-" : span.name();
- spanBuilder.setOperationName(opName);
- ClientSideSpan clientSideSpan;
- switch (kind) {
- case CLIENT:
- spanBuilder.setSpanType(SpanType.Exit);
- String peer = getPeer(parentSpan, span);
- if (peer != null) {
- spanBuilder.setPeer(peer);
- }
- clientSideSpan = new ClientSideSpan(span, spanBuilder);
- clientPartSpan.put(span.id(), clientSideSpan);
- break;
- case SERVER:
- spanBuilder.setSpanType(SpanType.Entry);
- this.buildRef(spanBuilder, span, parentSegmentSpan, parentSpan);
- break;
- case CONSUMER:
- spanBuilder.setSpanType(SpanType.Entry);
- this.buildRef(spanBuilder, span, parentSegmentSpan, parentSpan);
- break;
- case PRODUCER:
- spanBuilder.setSpanType(SpanType.Exit);
- peer = getPeer(parentSpan, span);
- if (peer != null) {
- spanBuilder.setPeer(peer);
- }
- clientSideSpan = new ClientSideSpan(span, spanBuilder);
- clientPartSpan.put(span.id(), clientSideSpan);
- break;
- default:
- spanBuilder.setSpanType(SpanType.Local);
- }
- // microseconds in Zipkin -> milliseconds in SkyWalking
- long startTime = span.timestamp() / 1000;
- // Some implement of zipkin client not include duration field in its report
- // package when duration's value be 0ms, Causing a null pointer exception here.
- Long durationObj = span.duration();
- long duration = (durationObj == null) ? 0 : durationObj.longValue() / 1000;
- spanBuilder.setStartTime(startTime);
- spanBuilder.setEndTime(startTime + duration);
-
- span.tags()
- .forEach((tagKey, tagValue) -> spanBuilder.addTags(KeyStringValuePair.newBuilder()
- .setKey(tagKey)
- .setValue(tagValue)
- .build()));
-
- span.annotations()
- .forEach(annotation -> spanBuilder.addLogs(Log.newBuilder()
- .setTime(annotation.timestamp() / 1000)
- .addData(KeyStringValuePair.newBuilder()
- .setKey("zipkin.annotation")
- .setValue(annotation.value())
- .build())));
-
- return spanBuilder;
- }
-
- private void buildRef(SpanObject.Builder spanBuilder, Span span, SpanObject.Builder parentSegmentSpan,
- Span parentSpan) {
- Segment parentSegment = context.parentSegment();
- if (parentSegment == null) {
- return;
- }
- Segment rootSegment = context.rootSegment();
- if (rootSegment == null) {
- return;
- }
-
- if (span.shared() != null && span.shared()) {
- // using same span id in client and server for RPC
- // SkyWalking will build both sides of span
- ClientSideSpan clientSideSpan = clientPartSpan.get(span.id());
- if (clientSideSpan != null) {
- // For the root span, there may be no ref, because of no parent.
- parentSegmentSpan = clientSideSpan.getBuilder();
- parentSpan = clientSideSpan.getSpan();
- }
- }
-
- String peer = getPeer(parentSpan, span);
- if (StringUtil.isEmpty(peer)) {
- //The IP is the most important for building the ref at both sides.
- return;
- }
-
- SegmentReference.Builder refBuilder = SegmentReference.newBuilder();
-
- // parent ref info
- refBuilder.setNetworkAddressUsedAtPeer(peer);
- parentSegmentSpan.setPeer(refBuilder.getNetworkAddressUsedAtPeer());
- refBuilder.setParentServiceInstance(parentSegment.builder().getServiceInstance());
- refBuilder.setParentSpanId(parentSegmentSpan.getSpanId());
- refBuilder.setParentTraceSegmentId(parentSegment.builder().getTraceSegmentId());
- refBuilder.setParentEndpoint(parentSegment.getEntryEndpointName());
- refBuilder.setRefType(RefType.CrossProcess);
-
- spanBuilder.addRefs(refBuilder);
- }
-
- private String getPeer(Span parentSpan, Span childSpan) {
- String peer;
-
- Endpoint serverEndpoint = childSpan == null ? null : childSpan.localEndpoint();
- peer = endpoint2Peer(serverEndpoint);
-
- if (peer == null) {
- Endpoint clientEndpoint = parentSpan == null ? null : parentSpan.remoteEndpoint();
- peer = endpoint2Peer(clientEndpoint);
- }
-
- return peer;
- }
-
- private String endpoint2Peer(Endpoint endpoint) {
- String ip = null;
- Integer port = 0;
-
- if (endpoint != null) {
- if (!Strings.isNullOrEmpty(endpoint.ipv4())) {
- ip = endpoint.ipv4();
- port = endpoint.port();
- } else if (!Strings.isNullOrEmpty(endpoint.ipv6())) {
- ip = endpoint.ipv6();
- port = endpoint.port();
- }
- }
- if (ip == null) {
- return null;
- } else {
- return port == null || port == 0 ? ip : ip + ":" + port;
- }
- }
-
- /**
- * Context holds the values in build process.
- */
- private class Context {
- private LinkedList<Segment> segmentsStack = new LinkedList<>();
-
- private boolean isServiceChanged(String service) {
- return !Strings.isNullOrEmpty(service) && !service.equals(currentIDs().service);
- }
-
- private Segment addService(String serviceCode) throws Exception {
- Segment segment = new Segment(serviceCode, serviceCode);
- segmentsStack.add(segment);
- return segment;
- }
-
- private IDCollection currentIDs() {
- return segmentsStack.getLast().ids;
- }
-
- private Segment currentSegment() {
- return segmentsStack.getLast();
- }
-
- private Segment parentSegment() {
- if (segmentsStack.size() < 2) {
- return null;
- } else {
- return segmentsStack.get(segmentsStack.size() - 2);
- }
-
- }
-
- private Segment rootSegment() {
- if (segmentsStack.size() < 2) {
- return null;
- } else {
- return segmentsStack.getFirst();
- }
- }
-
- private Segment removeApp() {
- return segmentsStack.removeLast();
- }
-
- private int waitForExchange(Callable<Integer> callable, int retry) throws Exception {
- for (int i = 0; i < retry; i++) {
- Integer id = callable.call();
- if (id == 0) {
- Thread.sleep(1000L);
- } else {
- return id;
- }
- }
- throw new TimeoutException("ID exchange costs more than expected.");
- }
- }
-
- private class Segment {
- private SegmentObject.Builder segmentBuilder;
- private IDCollection ids;
- private String entryEndpointName = null;
- private List<SpanObject.Builder> spans;
- private long endTime = 0;
-
- private Segment(String service, String serviceInstance) {
- ids = new IDCollection(service, serviceInstance);
- spans = new LinkedList<>();
- segmentBuilder = SegmentObject.newBuilder();
- segmentBuilder.setService(service);
- segmentBuilder.setServiceInstance(serviceInstance);
- segmentBuilder.setTraceSegmentId(UUID.randomUUID().toString().replaceAll("-", ""));
- }
-
- private SegmentObject.Builder builder() {
- return segmentBuilder;
- }
-
- private void addSpan(SpanObject.Builder spanBuilder) {
- String operationName = spanBuilder.getOperationName();
- if (StringUtil.isEmpty(entryEndpointName) && !Strings.isNullOrEmpty(operationName)) {
- if (SpanType.Entry.equals(spanBuilder.getSpanType())) {
- if (!Strings.isNullOrEmpty(operationName)) {
- entryEndpointName = operationName;
- }
- }
- }
-
- // init by root span
- if (spanBuilder.getSpanId() == 1 && StringUtil.isEmpty(entryEndpointName)) {
- if (!Strings.isNullOrEmpty(operationName)) {
- entryEndpointName = operationName;
- }
- }
-
- spans.add(spanBuilder);
- if (spanBuilder.getEndTime() > endTime) {
- endTime = spanBuilder.getEndTime();
- }
- }
-
- public String getEntryEndpointName() {
- return entryEndpointName;
- }
-
- private IDCollection ids() {
- return ids;
- }
-
- public SegmentObject.Builder freeze() {
- for (SpanObject.Builder span : spans) {
- segmentBuilder.addSpans(span);
- }
- return segmentBuilder;
- }
- }
-
- private class IDCollection {
- private String service;
- private String instanceName;
- private int spanIdSeq;
-
- private IDCollection(String service, String instanceName) {
- this.service = service;
- this.instanceName = instanceName;
- this.spanIdSeq = 0;
- }
-
- private int nextSpanId() {
- return spanIdSeq++;
- }
- }
-
- private class ClientSideSpan {
- private Span span;
- private SpanObject.Builder builder;
-
- public ClientSideSpan(Span span, SpanObject.Builder builder) {
- this.span = span;
- this.builder = builder;
- }
-
- public Span getSpan() {
- return span;
- }
-
- public SpanObject.Builder getBuilder() {
- return builder;
- }
- }
-}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentListener.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentListener.java
deleted file mode 100644
index 5c37c9a..0000000
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentListener.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
-
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
-
-public interface SegmentListener {
- void notify(SkyWalkingTrace trace);
-}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/Zipkin2SkyWalkingTransfer.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/Zipkin2SkyWalkingTransfer.java
deleted file mode 100644
index 9a677c1..0000000
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/Zipkin2SkyWalkingTransfer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
-
-import java.util.LinkedList;
-import java.util.List;
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace;
-import zipkin2.Span;
-
-public class Zipkin2SkyWalkingTransfer {
- public static Zipkin2SkyWalkingTransfer INSTANCE = new Zipkin2SkyWalkingTransfer();
- private List<SegmentListener> listeners = new LinkedList<>();
-
- private Zipkin2SkyWalkingTransfer() {
- }
-
- public void addListener(SegmentListener listener) {
- listeners.add(listener);
- }
-
- public void transfer(ZipkinTrace trace) throws Exception {
- List<Span> traceSpans = trace.getSpans();
-
- if (traceSpans.size() > 0) {
- SkyWalkingTrace skyWalkingTrace = SegmentBuilder.build(traceSpans);
-
- listeners.forEach(listener -> listener.notify(skyWalkingTrace));
-
- }
- }
-}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
index 44852de..8ae6121 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java
@@ -24,19 +24,18 @@ import java.io.InputStream;
import java.util.List;
import java.util.zip.GZIPInputStream;
import javax.servlet.http.HttpServletRequest;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.ZipkinSkyWalkingTransfer;
import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
+@RequiredArgsConstructor
public class SpanProcessor {
- private SourceReceiver receiver;
-
- public SpanProcessor(SourceReceiver receiver) {
- this.receiver = receiver;
- }
+ private final NamingControl namingControl;
+ private final SourceReceiver receiver;
void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request) throws IOException {
try (InputStream inputStream = getInputStream(request)) {
@@ -50,13 +49,8 @@ public class SpanProcessor {
List<Span> spanList = decoder.decodeList(out.toByteArray());
- if (config.isNeedAnalysis()) {
- ZipkinSkyWalkingTransfer transfer = new ZipkinSkyWalkingTransfer();
- transfer.doTransfer(config, spanList);
- } else {
- SpanForward forward = new SpanForward(config, receiver);
- forward.send(spanList);
- }
+ SpanForward forward = new SpanForward(namingControl, receiver);
+ forward.send(spanList);
}
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java
index 7f506dc..817d9ec 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java
@@ -20,23 +20,24 @@ package org.apache.skywalking.oap.server.receiver.zipkin.handler;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import zipkin2.codec.SpanBytesDecoder;
+@Slf4j
public class SpanV1JettyHandler extends JettyHandler {
- private static final Logger LOGGER = LoggerFactory.getLogger(SpanV1JettyHandler.class);
-
- private ZipkinReceiverConfig config;
- private SourceReceiver sourceReceiver;
+ private final ZipkinReceiverConfig config;
+ private final SourceReceiver sourceReceiver;
+ private final NamingControl namingControl;
public SpanV1JettyHandler(ZipkinReceiverConfig config, ModuleManager manager) {
sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
+ namingControl = manager.find(CoreModule.NAME).provider().getService(NamingControl.class);
this.config = config;
}
@@ -57,14 +58,14 @@ public class SpanV1JettyHandler extends JettyHandler {
SpanBytesDecoder decoder = SpanEncode.isThrift(encode) ? SpanBytesDecoder.THRIFT : SpanBytesDecoder.JSON_V1;
- SpanProcessor processor = new SpanProcessor(sourceReceiver);
+ SpanProcessor processor = new SpanProcessor(namingControl, sourceReceiver);
processor.convert(config, decoder, request);
response.setStatus(202);
} catch (Exception e) {
response.setStatus(500);
- LOGGER.error(e.getMessage(), e);
+ log.error(e.getMessage(), e);
}
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java
index bc2e6da..00a07be 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java
@@ -20,23 +20,25 @@ package org.apache.skywalking.oap.server.receiver.zipkin.handler;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import zipkin2.codec.SpanBytesDecoder;
+@Slf4j
public class SpanV2JettyHandler extends JettyHandler {
- private static final Logger LOGGER = LoggerFactory.getLogger(SpanV2JettyHandler.class);
- private ZipkinReceiverConfig config;
- private SourceReceiver sourceReceiver;
+ private final ZipkinReceiverConfig config;
+ private final SourceReceiver sourceReceiver;
+ private final NamingControl namingControl;
public SpanV2JettyHandler(ZipkinReceiverConfig config, ModuleManager manager) {
sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
+ namingControl = manager.find(CoreModule.NAME).provider().getService(NamingControl.class);
this.config = config;
}
@@ -57,14 +59,14 @@ public class SpanV2JettyHandler extends JettyHandler {
SpanBytesDecoder decoder = SpanEncode.isProto3(encode) ? SpanBytesDecoder.PROTO3 : SpanBytesDecoder.JSON_V2;
- SpanProcessor processor = new SpanProcessor(sourceReceiver);
+ SpanProcessor processor = new SpanProcessor(namingControl, sourceReceiver);
processor.convert(config, decoder, request);
response.setStatus(202);
} catch (Exception e) {
response.setStatus(500);
- LOGGER.error(e.getMessage(), e);
+ log.error(e.getMessage(), e);
}
}
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
index 043ad7b..3cde23a 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
@@ -19,26 +19,25 @@
package org.apache.skywalking.oap.server.receiver.zipkin.trace;
import java.util.List;
+import lombok.RequiredArgsConstructor;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.config.NamingControl;
+import org.apache.skywalking.oap.server.core.source.EndpointMeta;
+import org.apache.skywalking.oap.server.core.source.ServiceMeta;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
-import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanEncode;
import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpan;
import zipkin2.Span;
import zipkin2.codec.SpanBytesEncoder;
+@RequiredArgsConstructor
public class SpanForward {
- private ZipkinReceiverConfig config;
- private SourceReceiver receiver;
-
- public SpanForward(ZipkinReceiverConfig config, SourceReceiver receiver) {
- this.config = config;
- this.receiver = receiver;
- }
+ private final NamingControl namingControl;
+ private final SourceReceiver receiver;
public void send(List<Span> spanList) {
spanList.forEach(span -> {
@@ -49,27 +48,28 @@ public class SpanForward {
if (StringUtil.isEmpty(serviceName)) {
serviceName = "Unknown";
}
+ serviceName = namingControl.formatServiceName(serviceName);
zipkinSpan.setServiceId(IDManager.ServiceID.buildId(serviceName, NodeType.Normal));
- String spanName = span.name();
- Span.Kind kind = span.kind();
- switch (kind) {
- case SERVER:
- case CONSUMER:
- if (!StringUtil.isEmpty(spanName)) {
- zipkinSpan.setEndpointId(IDManager.EndpointID.buildId(zipkinSpan.getServiceId(), span.name()));
- }
- }
- if (!StringUtil.isEmpty(spanName)) {
- zipkinSpan.setEndpointName(spanName);
- }
long startTime = span.timestampAsLong() / 1000;
zipkinSpan.setStartTime(startTime);
- if (startTime != 0) {
- long timeBucket = TimeBucket.getRecordTimeBucket(zipkinSpan.getStartTime());
- zipkinSpan.setTimeBucket(timeBucket);
- }
+ long timeBucket = TimeBucket.getRecordTimeBucket(zipkinSpan.getStartTime());
+ zipkinSpan.setTimeBucket(timeBucket);
+
+ String spanName = span.name();
+ if (!StringUtil.isEmpty(spanName)) {
+ final String endpointName = namingControl.formatEndpointName(serviceName, spanName);
+ zipkinSpan.setEndpointName(endpointName);
+ zipkinSpan.setEndpointId(IDManager.EndpointID.buildId(zipkinSpan.getServiceId(), endpointName));
+ //Create endpoint meta for the server side span
+ EndpointMeta endpointMeta = new EndpointMeta();
+ endpointMeta.setServiceName(serviceName);
+ endpointMeta.setServiceNodeType(NodeType.Normal);
+ endpointMeta.setEndpoint(endpointName);
+ endpointMeta.setTimeBucket(timeBucket);
+ receiver.receive(endpointMeta);
+ }
long latency = span.durationAsLong() / 1000;
zipkinSpan.setEndTime(startTime + latency);
@@ -78,7 +78,19 @@ public class SpanForward {
zipkinSpan.setLatency((int) latency);
zipkinSpan.setDataBinary(SpanBytesEncoder.PROTO3.encode(span));
+ span.tags().forEach((key, value) -> {
+ zipkinSpan.getTags().add(key + "=" + value);
+ });
+
receiver.receive(zipkinSpan);
+
+ // Create the metadata source
+ // No instance name is required in the Zipkin model.
+ ServiceMeta serviceMeta = new ServiceMeta();
+ serviceMeta.setName(serviceName);
+ serviceMeta.setNodeType(NodeType.Normal);
+ serviceMeta.setTimeBucket(timeBucket);
+ receiver.receive(serviceMeta);
});
}
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SpringSleuthSegmentBuilderTest.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SpringSleuthSegmentBuilderTest.java
deleted file mode 100644
index fed8448..0000000
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SpringSleuthSegmentBuilderTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform;
-
-import java.io.UnsupportedEncodingException;
-import java.util.LinkedList;
-import java.util.List;
-import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
-import org.apache.skywalking.apm.network.language.agent.v3.SegmentReference;
-import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
-import org.apache.skywalking.apm.network.language.agent.v3.SpanType;
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
-import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace;
-import org.junit.Assert;
-import org.junit.Test;
-import zipkin2.Span;
-import zipkin2.codec.SpanBytesDecoder;
-
-public class SpringSleuthSegmentBuilderTest implements SegmentListener {
- @Test
- public void testTransform() throws Exception {
-
- Zipkin2SkyWalkingTransfer.INSTANCE.addListener(this);
-
- List<Span> spanList = buildSpringSleuthExampleTrace();
- Assert.assertEquals(3, spanList.size());
-
- ZipkinTrace trace = new ZipkinTrace();
- spanList.forEach(span -> trace.addSpan(span));
-
- Zipkin2SkyWalkingTransfer.INSTANCE.transfer(trace);
- }
-
- private List<Span> buildSpringSleuthExampleTrace() throws UnsupportedEncodingException {
- List<Span> spans = new LinkedList<>();
- String span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"id\":\"1a8a1b5bdd791b8a\",\"kind\":\"SERVER\",\"name\":\"get /\",\"timestamp\":1527669813700123,\"duration\":11295,\"localEndpoint\":{\"serviceName\":\"frontend\",\"ipv4\":\"192.168.72.220\"},\"remoteEndpoint\":{\"ipv6\":\"::1\",\"port\":55146},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/\",\"mvc.controller.class\":\"Frontend\",\"mvc.controller.method\":\"callBackend\"}}";
- spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8")));
- span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"parentId\":\"1a8a1b5bdd791b8a\",\"id\":\"d7d5b93dcda767c8\",\"kind\":\"CLIENT\",\"name\":\"get\",\"timestamp\":1527669813702456,\"duration\":6672,\"localEndpoint\":{\"serviceName\":\"frontend\",\"ipv4\":\"192.168.72.220\"},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/api\"}}";
- spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8")));
- span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"parentId\":\"1a8a1b5bdd791b8a\",\"id\":\"d7d5b93dcda767c8\",\"kind\":\"SERVER\",\"name\":\"get /api\",\"timestamp\":1527669813705106,\"duration\":4802,\"localEndpoint\":{\"serviceName\":\"backend\",\"ipv4\":\"192.168.72.220\"},\"remoteEndpoint\":{\"ipv4\":\"127.0.0.1\",\"port\":55147},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/api\",\"mvc.controller.class\":\"Backend\",\"mvc.controller.method\":\"printDate\"},\"s [...]
- spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8")));
-
- return SpanBytesDecoder.JSON_V2.decodeList(spans.toString().getBytes("UTF-8"));
- }
-
- @Override
- public void notify(SkyWalkingTrace trace) {
- List<SegmentObject.Builder> segments = trace.getSegmentList();
- Assert.assertEquals(2, segments.size());
- SegmentObject.Builder builder = segments.get(0);
- SegmentObject.Builder builder1 = segments.get(1);
- SegmentObject.Builder front, end;
- if (builder.getService().equals("frontend")) {
- front = builder;
- end = builder1;
- } else if (builder.getService().equals("backend")) {
- end = builder;
- front = builder1;
- } else {
- Assert.fail("Can't find frontend and backend applications, " + builder.getService());
- return;
- }
-
- Assert.assertEquals(2, front.getSpansCount());
- Assert.assertEquals(1, end.getSpansCount());
-
- front.getSpansList().forEach(spanObject -> {
- if (spanObject.getSpanId() == 0) {
- // span id = 1, means incoming http of frontend
- Assert.assertEquals(SpanType.Entry, spanObject.getSpanType());
- Assert.assertEquals("get /", spanObject.getOperationName());
- Assert.assertEquals(0, spanObject.getSpanId());
- Assert.assertEquals(-1, spanObject.getParentSpanId());
- } else if (spanObject.getSpanId() == 1) {
- Assert.assertEquals("192.168.72.220", spanObject.getPeer());
- Assert.assertEquals(SpanType.Exit, spanObject.getSpanType());
- Assert.assertEquals(1, spanObject.getSpanId());
- Assert.assertEquals(0, spanObject.getParentSpanId());
- } else {
- Assert.fail("Only two spans expected");
- }
- Assert.assertTrue(spanObject.getTagsCount() > 0);
- });
-
- SpanObject spanObject = end.getSpans(0);
-
- Assert.assertEquals(1, spanObject.getRefsCount());
- SegmentReference spanObjectRef = spanObject.getRefs(0);
- Assert.assertEquals("get /", spanObjectRef.getParentEndpoint());
- Assert.assertEquals("192.168.72.220", spanObjectRef.getNetworkAddressUsedAtPeer());
- Assert.assertEquals(1, spanObjectRef.getParentSpanId());
- Assert.assertEquals(front.getTraceSegmentId(), spanObjectRef.getParentTraceSegmentId());
-
- Assert.assertTrue(spanObject.getTagsCount() > 0);
-
- Assert.assertEquals("get /api", spanObject.getOperationName());
- Assert.assertEquals(0, spanObject.getSpanId());
- Assert.assertEquals(-1, spanObject.getParentSpanId());
- Assert.assertEquals(SpanType.Entry, spanObject.getSpanType());
- }
-}
diff --git a/oap-server/server-starter-es7/pom.xml b/oap-server/server-starter-es7/pom.xml
index bc458ef..ad32b33 100644
--- a/oap-server/server-starter-es7/pom.xml
+++ b/oap-server/server-starter-es7/pom.xml
@@ -44,6 +44,11 @@
<artifactId>storage-elasticsearch7-plugin</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>zipkin-receiver-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- profile exporter -->
<dependency>
diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml
index baf3abb..88a68f7 100644
--- a/oap-server/server-starter/pom.xml
+++ b/oap-server/server-starter/pom.xml
@@ -45,16 +45,6 @@
<artifactId>storage-zipkin-plugin</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>jaeger-receiver-plugin</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>zipkin-receiver-plugin</artifactId>
- <version>${project.version}</version>
- </dependency>
<!-- profile exporter -->
<dependency>
diff --git a/oap-server/server-storage-plugin/pom.xml b/oap-server/server-storage-plugin/pom.xml
index 42c38e4..38a813c 100644
--- a/oap-server/server-storage-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/pom.xml
@@ -31,8 +31,7 @@
<module>storage-jdbc-hikaricp-plugin</module>
<module>storage-elasticsearch-plugin</module>
<module>storage-elasticsearch7-plugin</module>
- <module>storage-zipkin-plugin</module>
- <module>storage-jaeger-plugin</module>
+ <module>storage-zipkin-elasticsearch7-plugin</module>
<module>storage-influxdb-plugin</module>
<module>storage-tidb-plugin</module>
</modules>
diff --git a/oap-server/server-storage-plugin/storage-jaeger-plugin/pom.xml b/oap-server/server-storage-plugin/storage-jaeger-plugin/pom.xml
deleted file mode 100644
index 350fb64..0000000
--- a/oap-server/server-storage-plugin/storage-jaeger-plugin/pom.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- ~
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>server-storage-plugin</artifactId>
- <groupId>org.apache.skywalking</groupId>
- <version>8.5.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>storage-jaeger-plugin</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>storage-elasticsearch-plugin</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>receiver-proto</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
-</project>
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/JaegerSpan.java b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/JaegerSpan.java
deleted file mode 100644
index 27ccbf6..0000000
--- a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/JaegerSpan.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.jaeger;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
-import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
-import org.apache.skywalking.oap.server.core.source.Source;
-
-import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.JAEGER_SPAN;
-
-@ScopeDeclaration(id = JAEGER_SPAN, name = "JaegerSpan")
-public class JaegerSpan extends Source {
-
- @Override
- public int scope() {
- return DefaultScopeDefine.JAEGER_SPAN;
- }
-
- @Override
- public String getEntityId() {
- return traceId + spanId;
- }
-
- @Setter
- @Getter
- private String traceId;
- @Setter
- @Getter
- private String spanId;
- @Setter
- @Getter
- private String serviceId;
- @Setter
- @Getter
- private String serviceInstanceId;
- @Setter
- @Getter
- private String endpointName;
- @Setter
- @Getter
- private String endpointId;
- @Setter
- @Getter
- private long startTime;
- @Setter
- @Getter
- private long endTime;
- @Setter
- @Getter
- private int latency;
- @Setter
- @Getter
- private int isError;
- @Setter
- @Getter
- private byte[] dataBinary;
- @Setter
- @Getter
- private int encode;
-}
diff --git a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/JaegerSpanRecord.java b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/JaegerSpanRecord.java
deleted file mode 100644
index ba74dba..0000000
--- a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/JaegerSpanRecord.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.jaeger;
-
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.skywalking.apm.util.StringUtil;
-import org.apache.skywalking.oap.server.core.Const;
-import org.apache.skywalking.oap.server.core.analysis.Stream;
-import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
-import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.annotation.Column;
-import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
-import org.apache.skywalking.oap.server.library.util.CollectionUtils;
-
-@SuperDataset
-@Stream(name = JaegerSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.JAEGER_SPAN, builder = JaegerSpanRecord.Builder.class, processor = RecordStreamProcessor.class)
-public class JaegerSpanRecord extends Record {
- public static final String INDEX_NAME = "jaeger_span";
- public static final String TRACE_ID = "trace_id";
- public static final String SPAN_ID = "span_id";
- public static final String SERVICE_ID = "service_id";
- public static final String SERVICE_INSTANCE_ID = "service_instance_id";
- public static final String ENDPOINT_NAME = "endpoint_name";
- public static final String ENDPOINT_ID = "endpoint_id";
- public static final String START_TIME = "start_time";
- public static final String END_TIME = "end_time";
- public static final String LATENCY = "latency";
- public static final String IS_ERROR = "is_error";
- public static final String DATA_BINARY = "data_binary";
- public static final String ENCODE = "encode";
-
- @Setter
- @Getter
- @Column(columnName = TRACE_ID)
- private String traceId;
- @Setter
- @Getter
- @Column(columnName = SPAN_ID)
- private String spanId;
- @Setter
- @Getter
- @Column(columnName = SERVICE_ID)
- private String serviceId;
- @Setter
- @Getter
- @Column(columnName = SERVICE_INSTANCE_ID)
- private String serviceInstanceId;
- @Setter
- @Getter
- @Column(columnName = ENDPOINT_NAME, matchQuery = true)
- private String endpointName;
- @Setter
- @Getter
- @Column(columnName = ENDPOINT_ID)
- private String endpointId;
- @Setter
- @Getter
- @Column(columnName = START_TIME)
- private long startTime;
- @Setter
- @Getter
- @Column(columnName = END_TIME)
- private long endTime;
- @Setter
- @Getter
- @Column(columnName = LATENCY)
- private int latency;
- @Setter
- @Getter
- @Column(columnName = IS_ERROR)
- private int isError;
- @Setter
- @Getter
- @Column(columnName = DATA_BINARY)
- private byte[] dataBinary;
- @Setter
- @Getter
- @Column(columnName = ENCODE)
- private int encode;
-
- @Override
- public String id() {
- return traceId + "-" + spanId;
- }
-
- public static class Builder implements StorageHashMapBuilder<JaegerSpanRecord> {
-
- @Override
- public Map<String, Object> entity2Storage(JaegerSpanRecord storageData) {
- Map<String, Object> map = new HashMap<>();
- map.put(TRACE_ID, storageData.getTraceId());
- map.put(SPAN_ID, storageData.getSpanId());
- map.put(SERVICE_ID, storageData.getServiceId());
- map.put(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId());
- map.put(ENDPOINT_NAME, storageData.getEndpointName());
- map.put(ENDPOINT_ID, storageData.getEndpointId());
- map.put(START_TIME, storageData.getStartTime());
- map.put(END_TIME, storageData.getEndTime());
- map.put(LATENCY, storageData.getLatency());
- map.put(IS_ERROR, storageData.getIsError());
- map.put(TIME_BUCKET, storageData.getTimeBucket());
- if (CollectionUtils.isEmpty(storageData.getDataBinary())) {
- map.put(DATA_BINARY, Const.EMPTY_STRING);
- } else {
- map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
- }
- map.put(ENCODE, storageData.getEncode());
- return map;
- }
-
- @Override
- public JaegerSpanRecord storage2Entity(Map<String, Object> dbMap) {
- JaegerSpanRecord record = new JaegerSpanRecord();
- record.setTraceId((String) dbMap.get(TRACE_ID));
- record.setSpanId((String) dbMap.get(SPAN_ID));
- record.setServiceId((String) dbMap.get(SERVICE_ID));
- record.setServiceInstanceId((String) dbMap.get(SERVICE_INSTANCE_ID));
- record.setEndpointName((String) dbMap.get(ENDPOINT_NAME));
- record.setEndpointId((String) dbMap.get(ENDPOINT_ID));
- record.setStartTime(((Number) dbMap.get(START_TIME)).longValue());
- record.setEndTime(((Number) dbMap.get(END_TIME)).longValue());
- record.setLatency(((Number) dbMap.get(LATENCY)).intValue());
- record.setIsError(((Number) dbMap.get(IS_ERROR)).intValue());
- record.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
- if (StringUtil.isEmpty((String) dbMap.get(DATA_BINARY))) {
- record.setDataBinary(new byte[] {});
- } else {
- record.setDataBinary(Base64.getDecoder().decode((String) dbMap.get(DATA_BINARY)));
- }
- record.setEncode(((Number) dbMap.get(ENCODE)).intValue());
- return record;
- }
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/JaegerSpanRecordDispatcher.java b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/JaegerSpanRecordDispatcher.java
deleted file mode 100644
index fc95836..0000000
--- a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/JaegerSpanRecordDispatcher.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.jaeger;
-
-import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
-import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
-
-/**
- * Dispatch for Zipkin native mode spans.
- */
-public class JaegerSpanRecordDispatcher implements SourceDispatcher<JaegerSpan> {
-
- @Override
- public void dispatch(JaegerSpan source) {
- JaegerSpanRecord segment = new JaegerSpanRecord();
- segment.setTraceId(source.getTraceId());
- segment.setSpanId(source.getSpanId());
- segment.setServiceId(source.getServiceId());
- segment.setServiceInstanceId(source.getServiceInstanceId());
- segment.setEndpointName(source.getEndpointName());
- segment.setEndpointId(source.getEndpointId());
- segment.setStartTime(source.getStartTime());
- segment.setEndTime(source.getEndTime());
- segment.setLatency(source.getLatency());
- segment.setIsError(source.getIsError());
- segment.setDataBinary(source.getDataBinary());
- segment.setTimeBucket(source.getTimeBucket());
- segment.setEncode(source.getEncode());
-
- RecordStreamProcessor.getInstance().in(segment);
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerStorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerStorageModuleElasticsearchProvider.java
deleted file mode 100644
index f76c1c0..0000000
--- a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerStorageModuleElasticsearchProvider.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.jaeger.elasticsearch;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchProvider;
-
-@Slf4j
-public class JaegerStorageModuleElasticsearchProvider extends StorageModuleElasticsearchProvider {
- @Override
- public String name() {
- return "jaeger-elasticsearch";
- }
-
- @Override
- public void prepare() throws ServiceNotProvidedException {
- super.prepare();
- JaegerTraceQueryEsDAO traceQueryEsDAO = new JaegerTraceQueryEsDAO(elasticSearchClient);
- this.registerServiceImplementation(ITraceQueryDAO.class, traceQueryEsDAO);
- }
-
- @Override
- public String[] requiredModules() {
- return new String[] {CoreModule.NAME};
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java
deleted file mode 100644
index cf78630..0000000
--- a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.jaeger.elasticsearch;
-
-import com.google.common.base.Strings;
-import com.google.protobuf.ByteString;
-import io.jaegertracing.api_v2.Model;
-import java.io.IOException;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Base64;
-import java.util.Collections;
-import java.util.List;
-import org.apache.skywalking.apm.util.StringUtil;
-import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
-import org.apache.skywalking.oap.server.core.query.type.KeyValue;
-import org.apache.skywalking.oap.server.core.query.type.LogEntity;
-import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
-import org.apache.skywalking.oap.server.core.query.type.Ref;
-import org.apache.skywalking.oap.server.core.query.type.RefType;
-import org.apache.skywalking.oap.server.core.query.type.Span;
-import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
-import org.apache.skywalking.oap.server.core.query.type.TraceState;
-import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
-import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
-import org.apache.skywalking.oap.server.library.util.BooleanUtils;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.RangeQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.BucketOrder;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
-import org.elasticsearch.search.aggregations.metrics.max.Max;
-import org.elasticsearch.search.aggregations.metrics.min.Min;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.search.sort.SortOrder;
-
-import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_BUCKET;
-import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.ENDPOINT_ID;
-import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.ENDPOINT_NAME;
-import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.END_TIME;
-import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.IS_ERROR;
-import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.LATENCY;
-import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.SERVICE_ID;
-import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.SERVICE_INSTANCE_ID;
-import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.START_TIME;
-import static org.apache.skywalking.oap.server.storage.plugin.jaeger.JaegerSpanRecord.TRACE_ID;
-
-public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
-
- public JaegerTraceQueryEsDAO(ElasticSearchClient client) {
- super(client);
- }
-
- @Override
- public TraceBrief queryBasicTraces(long startSecondTB,
- long endSecondTB,
- long minDuration,
- long maxDuration,
- String endpointName,
- String serviceId,
- String serviceInstanceId,
- String endpointId,
- String traceId,
- int limit,
- int from,
- TraceState traceState,
- QueryOrder queryOrder,
- final List<Tag> tags) throws IOException {
-
- SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
-
- BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
- sourceBuilder.query(boolQueryBuilder);
- List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
-
- if (startSecondTB != 0 && endSecondTB != 0) {
- mustQueryList.add(QueryBuilders.rangeQuery(TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
- }
-
- if (minDuration != 0 || maxDuration != 0) {
- RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(LATENCY);
- if (minDuration != 0) {
- rangeQueryBuilder.gte(minDuration);
- }
- if (maxDuration != 0) {
- rangeQueryBuilder.lte(maxDuration);
- }
- boolQueryBuilder.must().add(rangeQueryBuilder);
- }
- if (!Strings.isNullOrEmpty(endpointName)) {
- mustQueryList.add(QueryBuilders.matchPhraseQuery(ENDPOINT_NAME, endpointName));
- }
- if (StringUtil.isNotEmpty(serviceId)) {
- boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_ID, serviceId));
- }
- if (StringUtil.isNotEmpty(serviceInstanceId)) {
- boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_INSTANCE_ID, serviceInstanceId));
- }
- if (!Strings.isNullOrEmpty(endpointId)) {
- boolQueryBuilder.must().add(QueryBuilders.termQuery(ENDPOINT_ID, endpointId));
- }
- if (!Strings.isNullOrEmpty(traceId)) {
- boolQueryBuilder.must().add(QueryBuilders.termQuery(TRACE_ID, traceId));
- }
- switch (traceState) {
- case ERROR:
- mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.TRUE));
- break;
- case SUCCESS:
- mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.FALSE));
- break;
- }
-
- TermsAggregationBuilder builder = AggregationBuilders.terms(TRACE_ID)
- .field(TRACE_ID)
- .size(limit)
- .subAggregation(AggregationBuilders.max(LATENCY)
- .field(LATENCY))
- .subAggregation(AggregationBuilders.min(START_TIME)
- .field(START_TIME));
- switch (queryOrder) {
- case BY_START_TIME:
- builder.order(BucketOrder.aggregation(START_TIME, false));
- break;
- case BY_DURATION:
- builder.order(BucketOrder.aggregation(LATENCY, false));
- break;
- }
- sourceBuilder.aggregation(builder);
-
- SearchResponse response = getClient().search(JaegerSpanRecord.INDEX_NAME, sourceBuilder);
-
- TraceBrief traceBrief = new TraceBrief();
-
- Terms terms = response.getAggregations().get(TRACE_ID);
-
- for (Terms.Bucket termsBucket : terms.getBuckets()) {
- BasicTrace basicTrace = new BasicTrace();
-
- basicTrace.setSegmentId(termsBucket.getKeyAsString());
- Min startTime = termsBucket.getAggregations().get(START_TIME);
- Max latency = termsBucket.getAggregations().get(LATENCY);
- basicTrace.setStart(String.valueOf((long) startTime.getValue()));
- basicTrace.getEndpointNames().add("");
- basicTrace.setDuration((int) latency.getValue());
- basicTrace.setError(false);
- basicTrace.getTraceIds().add(termsBucket.getKeyAsString());
- traceBrief.getTraces().add(basicTrace);
- }
-
- return traceBrief;
- }
-
- @Override
- public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
- return Collections.emptyList();
- }
-
- @Override
- public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
- SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
- sourceBuilder.query(QueryBuilders.termQuery(TRACE_ID, traceId));
- sourceBuilder.sort(START_TIME, SortOrder.ASC);
- sourceBuilder.size(1000);
-
- SearchResponse response = getClient().search(JaegerSpanRecord.INDEX_NAME, sourceBuilder);
-
- List<Span> spanList = new ArrayList<>();
-
- for (SearchHit searchHit : response.getHits().getHits()) {
- String serviceId = (String) searchHit.getSourceAsMap().get(SERVICE_ID);
- long startTime = ((Number) searchHit.getSourceAsMap().get(START_TIME)).longValue();
- long endTime = ((Number) searchHit.getSourceAsMap().get(END_TIME)).longValue();
- String dataBinaryBase64 = (String) searchHit.getSourceAsMap().get(SegmentRecord.DATA_BINARY);
-
- Model.Span jaegerSpan = Model.Span.newBuilder()
- .mergeFrom(Base64.getDecoder().decode(dataBinaryBase64))
- .build();
-
- Span swSpan = new Span();
-
- swSpan.setTraceId(format(jaegerSpan.getTraceId()));
- swSpan.setEndpointName(jaegerSpan.getOperationName());
- swSpan.setStartTime(startTime);
- swSpan.setEndTime(endTime);
- jaegerSpan.getTagsList().forEach(keyValue -> {
- String key = keyValue.getKey();
- Model.ValueType valueVType = keyValue.getVType();
- switch (valueVType) {
- case STRING:
- swSpan.getTags().add(new KeyValue(key, keyValue.getVStr()));
- break;
- case INT64:
- swSpan.getTags().add(new KeyValue(key, keyValue.getVInt64() + ""));
- break;
- case BOOL:
- swSpan.getTags().add(new KeyValue(key, keyValue.getVBool() + ""));
- break;
- case FLOAT64:
- swSpan.getTags().add(new KeyValue(key, keyValue.getVFloat64() + ""));
- break;
- }
- swSpan.setType("Local");
- if ("span.kind".equals(key)) {
- String kind = keyValue.getVStr();
- if ("server".equals(kind) || "consumer".equals(kind)) {
- swSpan.setType("Entry");
- } else if ("client".equals(kind) || "producer".equals(kind)) {
- swSpan.setType("Exit");
- }
- }
- });
- jaegerSpan.getLogsList().forEach(log -> {
- LogEntity entity = new LogEntity();
- boolean hasTimestamp = log.hasTimestamp();
- if (hasTimestamp) {
- long time = Instant.ofEpochSecond(log.getTimestamp().getSeconds(), log.getTimestamp().getNanos())
- .toEpochMilli();
- entity.setTime(time);
- }
- log.getFieldsList().forEach(field -> {
- String key = field.getKey();
- Model.ValueType valueVType = field.getVType();
- switch (valueVType) {
- case STRING:
- entity.getData().add(new KeyValue(key, field.getVStr()));
- break;
- case INT64:
- entity.getData().add(new KeyValue(key, field.getVInt64() + ""));
- break;
- case BOOL:
- entity.getData().add(new KeyValue(key, field.getVBool() + ""));
- break;
- case FLOAT64:
- entity.getData().add(new KeyValue(key, field.getVFloat64() + ""));
- break;
- }
- });
-
- swSpan.getLogs().add(entity);
- });
-
- final IDManager.ServiceID.ServiceIDDefinition serviceIDDefinition = IDManager.ServiceID.analysisId(
- serviceId);
-
- swSpan.setServiceCode(serviceIDDefinition.getName());
- swSpan.setSpanId(0);
- swSpan.setParentSpanId(-1);
- String spanId = id(format(jaegerSpan.getTraceId()), format(jaegerSpan.getSpanId()));
- swSpan.setSegmentSpanId(spanId);
- swSpan.setSegmentId(spanId);
-
- List<Model.SpanRef> spanReferencesList = jaegerSpan.getReferencesList();
- if (spanReferencesList.size() > 0) {
- spanReferencesList.forEach(jaegerRef -> {
- Ref ref = new Ref();
- ref.setTraceId(format(jaegerRef.getTraceId()));
- String parentId = id(format(jaegerRef.getTraceId()), format(jaegerRef.getSpanId()));
- ref.setParentSegmentId(parentId);
- ref.setType(RefType.CROSS_PROCESS);
- ref.setParentSpanId(0);
-
- swSpan.getRefs().add(ref);
- swSpan.setSegmentParentSpanId(parentId);
- });
- } else {
- swSpan.setRoot(true);
- swSpan.setSegmentParentSpanId("");
- }
- spanList.add(swSpan);
- }
- return spanList;
- }
-
- private String id(String traceId, String spanId) {
- return traceId + "_" + spanId;
- }
-
- private String format(ByteString bytes) {
- Base64.Encoder encoder = Base64.getEncoder();
- return encoder.encodeToString(bytes.toByteArray());
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
deleted file mode 100755
index eac6392..0000000
--- a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-
-org.apache.skywalking.oap.server.storage.plugin.jaeger.elasticsearch.JaegerStorageModuleElasticsearchProvider
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/pom.xml
similarity index 77%
rename from oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml
rename to oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/pom.xml
index 3d8159c..0c2e40f 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/pom.xml
@@ -25,15 +25,23 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>storage-zipkin-plugin</artifactId>
+ <artifactId>storage-zipkin-elasticsearch7-plugin</artifactId>
+ <properties>
+ <elasticsearch.version>7.5.0</elasticsearch.version>
+ </properties>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
- <artifactId>storage-elasticsearch-plugin</artifactId>
+ <artifactId>storage-elasticsearch7-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-high-level-client</artifactId>
+ <version>${elasticsearch.version}</version>
+ </dependency>
+ <dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin</artifactId>
</dependency>
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java
similarity index 94%
rename from oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java
rename to oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java
index d8a79c8..e277b65 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java
+++ b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java
@@ -18,6 +18,8 @@
package org.apache.skywalking.oap.server.storage.plugin.zipkin;
+import java.util.ArrayList;
+import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
@@ -75,4 +77,7 @@ public class ZipkinSpan extends Source {
@Setter
@Getter
private int encode;
+ @Setter
+ @Getter
+ private List<String> tags = new ArrayList<>();
}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java
similarity index 95%
rename from oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java
rename to oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java
index 601a9f9..434b3eb 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java
+++ b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.zipkin;
import java.util.Base64;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
@@ -50,6 +51,7 @@ public class ZipkinSpanRecord extends Record {
public static final String IS_ERROR = "is_error";
public static final String DATA_BINARY = "data_binary";
public static final String ENCODE = "encode";
+ public static final String TAGS = "tags";
@Setter
@Getter
@@ -99,6 +101,10 @@ public class ZipkinSpanRecord extends Record {
@Getter
@Column(columnName = ENCODE)
private int encode;
+ @Setter
+ @Getter
+ @Column(columnName = TAGS)
+ private List<String> tags;
@Override
public String id() {
@@ -127,6 +133,7 @@ public class ZipkinSpanRecord extends Record {
map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
}
map.put(ENCODE, storageData.getEncode());
+ map.put(TAGS, storageData.getTags());
return map;
}
@@ -150,6 +157,7 @@ public class ZipkinSpanRecord extends Record {
record.setDataBinary(Base64.getDecoder().decode((String) dbMap.get(DATA_BINARY)));
}
record.setEncode(((Number) dbMap.get(ENCODE)).intValue());
+ // Don't read the tags as they has been in the data binary already.
return record;
}
}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java
similarity index 97%
rename from oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java
rename to oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java
index bd10c43..8cfc7b1 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java
+++ b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java
@@ -42,6 +42,7 @@ public class ZipkinSpanRecordDispatcher implements SourceDispatcher<ZipkinSpan>
segment.setDataBinary(source.getDataBinary());
segment.setTimeBucket(source.getTimeBucket());
segment.setEncode(source.getEncode());
+ segment.setTags(source.getTags());
RecordStreamProcessor.getInstance().in(segment);
}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java
similarity index 87%
rename from oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java
rename to oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java
index 3d6dc0a..5cc90b1 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java
@@ -22,20 +22,20 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchProvider;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.StorageModuleElasticsearch7Provider;
@Slf4j
-public class ZipkinStorageModuleElasticsearchProvider extends StorageModuleElasticsearchProvider {
+public class ZipkinStorageModuleElasticsearchProvider extends StorageModuleElasticsearch7Provider {
@Override
public String name() {
- return "zipkin-elasticsearch";
+ return "zipkin-elasticsearch7";
}
@Override
public void prepare() throws ServiceNotProvidedException {
super.prepare();
- final ZipkinTraceQueryEsDAO traceQueryEsDAO = new ZipkinTraceQueryEsDAO(elasticSearchClient);
+ final ZipkinTraceQueryEs7DAO traceQueryEsDAO = new ZipkinTraceQueryEs7DAO(elasticSearch7Client);
this.registerServiceImplementation(ITraceQueryDAO.class, traceQueryEsDAO);
}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEs7DAO.java
similarity index 79%
rename from oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
rename to oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEs7DAO.java
index 43989e9..b930c43 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEs7DAO.java
@@ -39,6 +39,7 @@ import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord;
import org.elasticsearch.action.search.SearchResponse;
@@ -47,12 +48,6 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.BucketOrder;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
-import org.elasticsearch.search.aggregations.metrics.max.Max;
-import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import zipkin2.Span;
@@ -65,12 +60,13 @@ import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanR
import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.SERVICE_ID;
import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.SERVICE_INSTANCE_ID;
import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.START_TIME;
+import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.TAGS;
import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.TIME_BUCKET;
import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.TRACE_ID;
-public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
+public class ZipkinTraceQueryEs7DAO extends EsDAO implements ITraceQueryDAO {
- public ZipkinTraceQueryEsDAO(ElasticSearchClient client) {
+ public ZipkinTraceQueryEs7DAO(ElasticSearchClient client) {
super(client);
}
@@ -108,22 +104,22 @@ public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
if (maxDuration != 0) {
rangeQueryBuilder.lte(maxDuration);
}
- boolQueryBuilder.must().add(rangeQueryBuilder);
+ mustQueryList.add(rangeQueryBuilder);
}
if (!Strings.isNullOrEmpty(endpointName)) {
mustQueryList.add(QueryBuilders.matchPhraseQuery(ENDPOINT_NAME, endpointName));
}
if (StringUtil.isNotEmpty(serviceId)) {
- boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_ID, serviceId));
+ mustQueryList.add(QueryBuilders.termQuery(SERVICE_ID, serviceId));
}
if (StringUtil.isNotEmpty(serviceInstanceId)) {
- boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_INSTANCE_ID, serviceInstanceId));
+ mustQueryList.add(QueryBuilders.termQuery(SERVICE_INSTANCE_ID, serviceInstanceId));
}
if (!Strings.isNullOrEmpty(endpointId)) {
- boolQueryBuilder.must().add(QueryBuilders.termQuery(ENDPOINT_ID, endpointId));
+ mustQueryList.add(QueryBuilders.termQuery(ENDPOINT_ID, endpointId));
}
if (!Strings.isNullOrEmpty(traceId)) {
- boolQueryBuilder.must().add(QueryBuilders.termQuery(TRACE_ID, traceId));
+ mustQueryList.add(QueryBuilders.termQuery(TRACE_ID, traceId));
}
switch (traceState) {
case ERROR:
@@ -133,41 +129,43 @@ public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.FALSE));
break;
}
+ if (CollectionUtils.isNotEmpty(tags)) {
+ BoolQueryBuilder tagMatchQuery = QueryBuilders.boolQuery();
+ tags.forEach(tag -> {
+ tagMatchQuery.must(QueryBuilders.termQuery(TAGS, tag.toString()));
+ });
+ mustQueryList.add(tagMatchQuery);
+ }
- TermsAggregationBuilder builder = AggregationBuilders.terms(TRACE_ID)
- .field(TRACE_ID)
- .size(limit)
- .subAggregation(AggregationBuilders.max(LATENCY)
- .field(LATENCY))
- .subAggregation(AggregationBuilders.min(START_TIME)
- .field(START_TIME));
switch (queryOrder) {
case BY_START_TIME:
- builder.order(BucketOrder.aggregation(START_TIME, false));
+ sourceBuilder.sort(START_TIME, SortOrder.DESC);
break;
case BY_DURATION:
- builder.order(BucketOrder.aggregation(LATENCY, false));
+ sourceBuilder.sort(LATENCY, SortOrder.DESC);
break;
}
- sourceBuilder.aggregation(builder);
+ sourceBuilder.size(limit);
+ sourceBuilder.from(from);
SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder);
TraceBrief traceBrief = new TraceBrief();
+ traceBrief.setTotal((int) response.getHits().getTotalHits().value);
- Terms terms = response.getAggregations().get(TRACE_ID);
-
- for (Terms.Bucket termsBucket : terms.getBuckets()) {
+ for (SearchHit searchHit : response.getHits().getHits()) {
BasicTrace basicTrace = new BasicTrace();
- basicTrace.setSegmentId(termsBucket.getKeyAsString());
- Min startTime = termsBucket.getAggregations().get(START_TIME);
- Max latency = termsBucket.getAggregations().get(LATENCY);
- basicTrace.setStart(String.valueOf((long) startTime.getValue()));
- basicTrace.getEndpointNames().add("");
- basicTrace.setDuration((int) latency.getValue());
- basicTrace.setError(false);
- basicTrace.getTraceIds().add(termsBucket.getKeyAsString());
+ final ZipkinSpanRecord zipkinSpanRecord = new ZipkinSpanRecord.Builder().storage2Entity(
+ searchHit.getSourceAsMap());
+
+ basicTrace.setSegmentId(zipkinSpanRecord.getSpanId());
+ basicTrace.setStart(String.valueOf((long) zipkinSpanRecord.getStartTime()));
+ // Show trace id as the name
+ basicTrace.getEndpointNames().add(zipkinSpanRecord.getEndpointName());
+ basicTrace.setDuration(zipkinSpanRecord.getLatency());
+ basicTrace.setError(BooleanUtils.valueToBoolean(zipkinSpanRecord.getIsError()));
+ basicTrace.getTraceIds().add(zipkinSpanRecord.getTraceId());
traceBrief.getTraces().add(basicTrace);
}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
similarity index 100%
rename from oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
rename to oap-server/server-storage-plugin/storage-zipkin-elasticsearch7-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
diff --git a/tools/dependencies/known-oap-backend-dependencies.txt b/tools/dependencies/known-oap-backend-dependencies.txt
index fcb6310..14abf78 100755
--- a/tools/dependencies/known-oap-backend-dependencies.txt
+++ b/tools/dependencies/known-oap-backend-dependencies.txt
@@ -11,7 +11,6 @@ bcpkix-jdk15on-1.66.jar
bcprov-ext-jdk15on-1.66.jar
bcprov-jdk15on-1.66.jar
builder-annotations-0.22.0.jar
-caffeine-2.6.2.jar
checker-qual-2.8.1.jar
client-java-10.0.0.jar
client-java-api-10.0.0.jar