You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/03/29 04:45:20 UTC
[incubator-skywalking] branch zipkin-trace updated: Finish some
tests.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch zipkin-trace
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/zipkin-trace by this push:
new a34d9ba Finish some tests.
a34d9ba is described below
commit a34d9bac2d4641ac43fc7ba0f1c4fb23ca82e5d2
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Mar 28 21:45:08 2019 -0700
Finish some tests.
---
.../oap/server/core/query/TraceQueryService.java | 20 +-
.../oap/server/core/query/entity/KeyValue.java | 8 +
.../server/core/storage/query/ITraceQueryDAO.java | 8 +
.../server/receiver/zipkin/trace/SpanForward.java | 8 +-
.../StorageModuleElasticsearchProvider.java | 4 +-
.../elasticsearch/query/TraceQueryEsDAO.java | 4 +
.../plugin/jdbc/h2/dao/H2TraceQueryDAO.java | 4 +
.../storage-zipkin-plugin/pom.xml | 4 +
.../StorageModuleElasticsearchProvider.java | 114 -----------
.../ZipkinStorageModuleElasticsearchProvider.java | 57 ++++++
.../elasticsearch/ZipkinTraceQueryEsDAO.java | 222 +++++++++++++++++++++
...alking.oap.server.library.module.ModuleProvider | 2 +-
12 files changed, 326 insertions(+), 129 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
index b735525..d0a9df7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
@@ -103,14 +103,18 @@ public class TraceQueryService implements Service {
Trace trace = new Trace();
List<SegmentRecord> segmentRecords = getTraceQueryDAO().queryByTraceId(traceId);
- for (SegmentRecord segment : segmentRecords) {
- if (nonNull(segment)) {
- if (segment.getVersion() == 2) {
- SegmentObject segmentObject = SegmentObject.parseFrom(segment.getDataBinary());
- trace.getSpans().addAll(buildSpanV2List(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
- } else {
- TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getDataBinary());
- trace.getSpans().addAll(buildSpanList(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
+ if (segmentRecords.isEmpty()) {
+ trace.getSpans().addAll(getTraceQueryDAO().doFlexibleTraceQuery(traceId));
+ } else {
+ for (SegmentRecord segment : segmentRecords) {
+ if (nonNull(segment)) {
+ if (segment.getVersion() == 2) {
+ SegmentObject segmentObject = SegmentObject.parseFrom(segment.getDataBinary());
+ trace.getSpans().addAll(buildSpanV2List(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
+ } else {
+ TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getDataBinary());
+ trace.getSpans().addAll(buildSpanList(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
+ }
}
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java
index 46e355e..b6cbd95 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java
@@ -28,4 +28,12 @@ import lombok.*;
public class KeyValue {
private String key;
private String value;
+
+ public KeyValue(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public KeyValue() {
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
index fc98d03..6517d2f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
@@ -34,4 +34,12 @@ public interface ITraceQueryDAO extends Service {
int limit, int from, TraceState traceState, QueryOrder queryOrder) throws IOException;
List<SegmentRecord> queryByTraceId(String traceId) throws IOException;
+
+ /**
+ * This method gives more flexible for unnative
+ * @param traceId
+ * @return
+ * @throws IOException
+ */
+ List<Span> doFlexibleTraceQuery(String traceId) throws IOException;
}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
index cf16a8e..8cf3ed0 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
@@ -88,14 +88,14 @@ public class SpanForward {
if (!StringUtil.isEmpty(spanName)) {
zipkinSpan.setEndpointName(spanName);
}
- long timestampAsLong = span.timestampAsLong();
- zipkinSpan.setStartTime(timestampAsLong);
- if (timestampAsLong != 0) {
+ long startTime = span.timestampAsLong() / 1000;
+ zipkinSpan.setStartTime(startTime);
+ if (startTime != 0) {
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(zipkinSpan.getStartTime());
zipkinSpan.setTimeBucket(timeBucket);
}
- zipkinSpan.setEndTime(timestampAsLong + span.durationAsLong());
+ zipkinSpan.setEndTime(startTime + span.durationAsLong() / 1000);
zipkinSpan.setIsError(BooleanUtils.booleanToValue(false));
zipkinSpan.setEncode(SpanEncode.PROTO3);
zipkinSpan.setLatency((int)span.durationAsLong());
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 7d1a713..ed3bc4b 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -70,8 +70,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class);
- private final StorageModuleElasticsearchConfig config;
- private ElasticSearchClient elasticSearchClient;
+ protected final StorageModuleElasticsearchConfig config;
+ protected ElasticSearchClient elasticSearchClient;
public StorageModuleElasticsearchProvider() {
super();
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
index ac5645a..4528189 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
@@ -147,4 +147,8 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
}
return segmentRecords;
}
+
+ @Override public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
+ return Collections.emptyList();
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
index ed8bf73..ec231e5 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
@@ -166,6 +166,10 @@ public class H2TraceQueryDAO implements ITraceQueryDAO {
return segmentRecords;
}
+ @Override public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
+ return Collections.emptyList();
+ }
+
protected JDBCHikariCPClient getClient() {
return h2Client;
}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml b/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml
index 658b47a..61d313b 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml
@@ -35,5 +35,9 @@
<artifactId>storage-elasticsearch-plugin</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.zipkin.zipkin2</groupId>
+ <artifactId>zipkin</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java
deleted file mode 100644
index cabd17c..0000000
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch;
-
-import org.apache.skywalking.apm.util.StringUtil;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.storage.*;
-import org.apache.skywalking.oap.server.core.storage.cache.*;
-import org.apache.skywalking.oap.server.core.storage.query.*;
-import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
-import org.apache.skywalking.oap.server.library.module.*;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.*;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.*;
-import org.slf4j.*;
-
-/**
- * @author peng-yongsheng
- */
-public class StorageModuleElasticsearchProvider extends ModuleProvider {
-
- private static final Logger logger = LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class);
-
- private final StorageModuleElasticsearchConfig config;
- private ElasticSearchClient elasticSearchClient;
-
- public StorageModuleElasticsearchProvider() {
- super();
- this.config = new StorageModuleElasticsearchConfig();
- }
-
- @Override
- public String name() {
- return "zipkin-elasticsearch";
- }
-
- @Override
- public Class<? extends ModuleDefine> module() {
- return StorageModule.class;
- }
-
- @Override
- public ModuleConfig createConfigBeanIfAbsent() {
- return config;
- }
-
- @Override
- public void prepare() throws ServiceNotProvidedException {
- if (!StringUtil.isEmpty(config.getNameSpace())) {
- config.setNameSpace(config.getNameSpace().toLowerCase());
- }
- elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getNameSpace(), config.getUser(), config.getPassword());
-
- this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
- this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
- this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient));
- this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearchClient));
-
- this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEsDAO(elasticSearchClient));
- this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient));
- this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient));
- this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient));
-
- this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
- this.registerServiceImplementation(IMetricQueryDAO.class, new MetricQueryEsDAO(elasticSearchClient));
- this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient));
- this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient));
- this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
- this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient));
- this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient));
- }
-
- @Override
- public void start() throws ModuleStartException {
- try {
- elasticSearchClient.connect();
-
- StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber());
- installer.install(elasticSearchClient);
-
- RegisterLockInstaller lockInstaller = new RegisterLockInstaller(elasticSearchClient);
- lockInstaller.install();
- } catch (StorageException e) {
- throw new ModuleStartException(e.getMessage(), e);
- }
- }
-
- @Override
- public void notifyAfterCompleted() {
- }
-
- @Override
- public String[] requiredModules() {
- return new String[] {CoreModule.NAME};
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java
new file mode 100644
index 0000000..e529fda
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchProvider;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ZipkinStorageModuleElasticsearchProvider extends StorageModuleElasticsearchProvider {
+
+ private static final Logger logger = LoggerFactory.getLogger(ZipkinStorageModuleElasticsearchProvider.class);
+ private ZipkinTraceQueryEsDAO traceQueryEsDAO;
+
+ @Override
+ public String name() {
+ return "zipkin-elasticsearch";
+ }
+
+ @Override
+ public void prepare() throws ServiceNotProvidedException {
+ super.prepare();
+ traceQueryEsDAO = new ZipkinTraceQueryEsDAO(elasticSearchClient);
+ this.registerServiceImplementation(ITraceQueryDAO.class, traceQueryEsDAO);
+ }
+
+ @Override public void notifyAfterCompleted() {
+ super.notifyAfterCompleted();
+ traceQueryEsDAO.setServiceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class));
+ }
+
+ @Override
+ public String[] requiredModules() {
+ return new String[] {CoreModule.NAME};
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
new file mode 100644
index 0000000..448e745
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch;
+
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.util.*;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.*;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.*;
+import org.elasticsearch.search.aggregations.bucket.terms.*;
+import org.elasticsearch.search.aggregations.metrics.max.Max;
+import org.elasticsearch.search.aggregations.metrics.min.Min;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import zipkin2.Span;
+import zipkin2.codec.SpanBytesDecoder;
+
+import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.*;
+
+public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
+ @Setter
+ private ServiceInventoryCache serviceInventoryCache;
+
+ public ZipkinTraceQueryEsDAO(
+ ElasticSearchClient client) {
+ super(client);
+ }
+
+ @Override
+ public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration,
+ String endpointName, int serviceId, int serviceInstanceId, int endpointId, String traceId, int limit, int from,
+ TraceState traceState, QueryOrder queryOrder) throws IOException {
+
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+
+ BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+ sourceBuilder.query(boolQueryBuilder);
+ List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
+
+ if (startSecondTB != 0 && endSecondTB != 0) {
+ mustQueryList.add(QueryBuilders.rangeQuery(TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
+ }
+
+ if (minDuration != 0 || maxDuration != 0) {
+ RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(LATENCY);
+ if (minDuration != 0) {
+ rangeQueryBuilder.gte(minDuration);
+ }
+ if (maxDuration != 0) {
+ rangeQueryBuilder.lte(maxDuration);
+ }
+ boolQueryBuilder.must().add(rangeQueryBuilder);
+ }
+ if (!Strings.isNullOrEmpty(endpointName)) {
+ mustQueryList.add(QueryBuilders.matchPhraseQuery(ENDPOINT_NAME, endpointName));
+ }
+ if (serviceId != 0) {
+ boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_ID, serviceId));
+ }
+ if (serviceInstanceId != 0) {
+ boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_INSTANCE_ID, serviceInstanceId));
+ }
+ if (endpointId != 0) {
+ boolQueryBuilder.must().add(QueryBuilders.termQuery(ENDPOINT_ID, endpointId));
+ }
+ if (!Strings.isNullOrEmpty(traceId)) {
+ boolQueryBuilder.must().add(QueryBuilders.termQuery(TRACE_ID, traceId));
+ }
+ switch (traceState) {
+ case ERROR:
+ mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.TRUE));
+ break;
+ case SUCCESS:
+ mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.FALSE));
+ break;
+ }
+
+ TermsAggregationBuilder builder = AggregationBuilders.terms(TRACE_ID).field(TRACE_ID).size(limit)
+ .subAggregation(
+ AggregationBuilders.max(LATENCY).field(LATENCY)
+ )
+ .subAggregation(
+ AggregationBuilders.min(START_TIME).field(START_TIME)
+ );
+ switch (queryOrder) {
+ case BY_START_TIME:
+ builder.order(BucketOrder.aggregation(START_TIME, false));
+ break;
+ case BY_DURATION:
+ builder.order(BucketOrder.aggregation(LATENCY, false));
+ break;
+ }
+ sourceBuilder.aggregation(builder);
+
+ SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder);
+
+ TraceBrief traceBrief = new TraceBrief();
+
+ Terms terms = response.getAggregations().get(TRACE_ID);
+
+ for (Terms.Bucket termsBucket : terms.getBuckets()) {
+ BasicTrace basicTrace = new BasicTrace();
+
+ basicTrace.setSegmentId(termsBucket.getKeyAsString());
+ Min startTime = termsBucket.getAggregations().get(START_TIME);
+ Max latency = termsBucket.getAggregations().get(LATENCY);
+ basicTrace.setStart(String.valueOf((long)startTime.getValue()));
+ basicTrace.getEndpointNames().add("");
+ basicTrace.setDuration((int)latency.getValue());
+ basicTrace.setError(false);
+ basicTrace.getTraceIds().add(termsBucket.getKeyAsString());
+ traceBrief.getTraces().add(basicTrace);
+ }
+
+ return traceBrief;
+ }
+
+ @Override public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override public List<org.apache.skywalking.oap.server.core.query.entity.Span> doFlexibleTraceQuery(
+ String traceId) throws IOException {
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ sourceBuilder.query(QueryBuilders.termQuery(TRACE_ID, traceId));
+ sourceBuilder.sort(START_TIME, SortOrder.ASC);
+ sourceBuilder.size(1000);
+
+ SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder);
+
+ List<org.apache.skywalking.oap.server.core.query.entity.Span> spanList = new ArrayList<>();
+
+ boolean isFirst = true;
+ for (SearchHit searchHit : response.getHits().getHits()) {
+ int serviceId = ((Number)searchHit.getSourceAsMap().get(SERVICE_ID)).intValue();
+ String dataBinaryBase64 = (String)searchHit.getSourceAsMap().get(SegmentRecord.DATA_BINARY);
+ Span span = SpanBytesDecoder.PROTO3.decodeOne(Base64.getDecoder().decode(dataBinaryBase64));
+
+ org.apache.skywalking.oap.server.core.query.entity.Span swSpan = new org.apache.skywalking.oap.server.core.query.entity.Span();
+
+ swSpan.setTraceId(span.traceId());
+ swSpan.setEndpointName(span.name());
+ swSpan.setStartTime(span.timestamp() / 1000);
+ swSpan.setEndTime(swSpan.getStartTime() + span.durationAsLong() / 1000);
+ span.tags().forEach((key, value) -> {
+ swSpan.getTags().add(new KeyValue(key, value));
+ });
+ span.annotations().forEach(annotation -> {
+ LogEntity entity = new LogEntity();
+ entity.setTime(annotation.timestamp() / 1000);
+ entity.getData().add(new KeyValue("annotation", annotation.value()));
+ swSpan.getLogs().add(entity);
+ });
+ if (serviceId != Const.NONE) {
+ swSpan.setServiceCode(serviceInventoryCache.get(serviceId).getName());
+ }
+ swSpan.setSpanId(0);
+ swSpan.setParentSpanId(-1);
+ swSpan.setSegmentSpanId(span.id());
+ swSpan.setSegmentId(span.id());
+ Span.Kind kind = span.kind();
+ switch (kind) {
+ case CLIENT:
+ case PRODUCER:
+ swSpan.setType("Entry");
+ break;
+ case SERVER:
+ case CONSUMER:
+ swSpan.setType("Exit");
+ break;
+ default:
+ swSpan.setType("Local");
+
+ }
+
+ if (isFirst) {
+ swSpan.setRoot(true);
+ swSpan.setSegmentParentSpanId("");
+ isFirst = false;
+ } else {
+ Ref ref = new Ref();
+ ref.setTraceId(span.traceId());
+ ref.setParentSegmentId(span.parentId());
+ ref.setType(RefType.CROSS_PROCESS);
+ ref.setParentSpanId(0);
+
+ swSpan.getRefs().add(ref);
+ swSpan.setSegmentParentSpanId(span.parentId());
+ }
+ spanList.add(swSpan);
+ }
+ return spanList;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
index 5f028bc..de8e186 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -16,4 +16,4 @@
#
#
-org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch.StorageModuleElasticsearchProvider
\ No newline at end of file
+org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch.ZipkinStorageModuleElasticsearchProvider
\ No newline at end of file