You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/03/29 04:45:20 UTC

[incubator-skywalking] branch zipkin-trace updated: Finish some tests.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch zipkin-trace
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/zipkin-trace by this push:
     new a34d9ba  Finish some tests.
a34d9ba is described below

commit a34d9bac2d4641ac43fc7ba0f1c4fb23ca82e5d2
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Mar 28 21:45:08 2019 -0700

    Finish some tests.
---
 .../oap/server/core/query/TraceQueryService.java   |  20 +-
 .../oap/server/core/query/entity/KeyValue.java     |   8 +
 .../server/core/storage/query/ITraceQueryDAO.java  |   8 +
 .../server/receiver/zipkin/trace/SpanForward.java  |   8 +-
 .../StorageModuleElasticsearchProvider.java        |   4 +-
 .../elasticsearch/query/TraceQueryEsDAO.java       |   4 +
 .../plugin/jdbc/h2/dao/H2TraceQueryDAO.java        |   4 +
 .../storage-zipkin-plugin/pom.xml                  |   4 +
 .../StorageModuleElasticsearchProvider.java        | 114 -----------
 .../ZipkinStorageModuleElasticsearchProvider.java  |  57 ++++++
 .../elasticsearch/ZipkinTraceQueryEsDAO.java       | 222 +++++++++++++++++++++
 ...alking.oap.server.library.module.ModuleProvider |   2 +-
 12 files changed, 326 insertions(+), 129 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
index b735525..d0a9df7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
@@ -103,14 +103,18 @@ public class TraceQueryService implements Service {
         Trace trace = new Trace();
 
         List<SegmentRecord> segmentRecords = getTraceQueryDAO().queryByTraceId(traceId);
-        for (SegmentRecord segment : segmentRecords) {
-            if (nonNull(segment)) {
-                if (segment.getVersion() == 2) {
-                    SegmentObject segmentObject = SegmentObject.parseFrom(segment.getDataBinary());
-                    trace.getSpans().addAll(buildSpanV2List(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
-                } else {
-                    TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getDataBinary());
-                    trace.getSpans().addAll(buildSpanList(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
+        if (segmentRecords.isEmpty()) {
+            trace.getSpans().addAll(getTraceQueryDAO().doFlexibleTraceQuery(traceId));
+        } else {
+            for (SegmentRecord segment : segmentRecords) {
+                if (nonNull(segment)) {
+                    if (segment.getVersion() == 2) {
+                        SegmentObject segmentObject = SegmentObject.parseFrom(segment.getDataBinary());
+                        trace.getSpans().addAll(buildSpanV2List(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
+                    } else {
+                        TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getDataBinary());
+                        trace.getSpans().addAll(buildSpanList(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
+                    }
                 }
             }
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java
index 46e355e..b6cbd95 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/KeyValue.java
@@ -28,4 +28,12 @@ import lombok.*;
 public class KeyValue {
     private String key;
     private String value;
+
+    public KeyValue(String key, String value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public KeyValue() {
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
index fc98d03..6517d2f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java
@@ -34,4 +34,12 @@ public interface ITraceQueryDAO extends Service {
         int limit, int from, TraceState traceState, QueryOrder queryOrder) throws IOException;
 
     List<SegmentRecord> queryByTraceId(String traceId) throws IOException;
+
+    /**
+     * This method gives more flexible for unnative
+     * @param traceId
+     * @return
+     * @throws IOException
+     */
+    List<Span> doFlexibleTraceQuery(String traceId) throws IOException;
 }
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
index cf16a8e..8cf3ed0 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
@@ -88,14 +88,14 @@ public class SpanForward {
             if (!StringUtil.isEmpty(spanName)) {
                 zipkinSpan.setEndpointName(spanName);
             }
-            long timestampAsLong = span.timestampAsLong();
-            zipkinSpan.setStartTime(timestampAsLong);
-            if (timestampAsLong != 0) {
+            long startTime = span.timestampAsLong() / 1000;
+            zipkinSpan.setStartTime(startTime);
+            if (startTime != 0) {
                 long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(zipkinSpan.getStartTime());
                 zipkinSpan.setTimeBucket(timeBucket);
             }
 
-            zipkinSpan.setEndTime(timestampAsLong + span.durationAsLong());
+            zipkinSpan.setEndTime(startTime + span.durationAsLong() / 1000);
             zipkinSpan.setIsError(BooleanUtils.booleanToValue(false));
             zipkinSpan.setEncode(SpanEncode.PROTO3);
             zipkinSpan.setLatency((int)span.durationAsLong());
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 7d1a713..ed3bc4b 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -70,8 +70,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
 
     private static final Logger logger = LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class);
 
-    private final StorageModuleElasticsearchConfig config;
-    private ElasticSearchClient elasticSearchClient;
+    protected final StorageModuleElasticsearchConfig config;
+    protected ElasticSearchClient elasticSearchClient;
 
     public StorageModuleElasticsearchProvider() {
         super();
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
index ac5645a..4528189 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java
@@ -147,4 +147,8 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
         }
         return segmentRecords;
     }
+
+    @Override public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
+        return Collections.emptyList();
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
index ed8bf73..ec231e5 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
@@ -166,6 +166,10 @@ public class H2TraceQueryDAO implements ITraceQueryDAO {
         return segmentRecords;
     }
 
+    @Override public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
+        return Collections.emptyList();
+    }
+
     protected JDBCHikariCPClient getClient() {
         return h2Client;
     }
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml b/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml
index 658b47a..61d313b 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml
@@ -35,5 +35,9 @@
             <artifactId>storage-elasticsearch-plugin</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.zipkin.zipkin2</groupId>
+            <artifactId>zipkin</artifactId>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java
deleted file mode 100644
index cabd17c..0000000
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch;
-
-import org.apache.skywalking.apm.util.StringUtil;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.storage.*;
-import org.apache.skywalking.oap.server.core.storage.cache.*;
-import org.apache.skywalking.oap.server.core.storage.query.*;
-import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
-import org.apache.skywalking.oap.server.library.module.*;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.*;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*;
-import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.*;
-import org.slf4j.*;
-
-/**
- * @author peng-yongsheng
- */
-public class StorageModuleElasticsearchProvider extends ModuleProvider {
-
-    private static final Logger logger = LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class);
-
-    private final StorageModuleElasticsearchConfig config;
-    private ElasticSearchClient elasticSearchClient;
-
-    public StorageModuleElasticsearchProvider() {
-        super();
-        this.config = new StorageModuleElasticsearchConfig();
-    }
-
-    @Override
-    public String name() {
-        return "zipkin-elasticsearch";
-    }
-
-    @Override
-    public Class<? extends ModuleDefine> module() {
-        return StorageModule.class;
-    }
-
-    @Override
-    public ModuleConfig createConfigBeanIfAbsent() {
-        return config;
-    }
-
-    @Override
-    public void prepare() throws ServiceNotProvidedException {
-        if (!StringUtil.isEmpty(config.getNameSpace())) {
-            config.setNameSpace(config.getNameSpace().toLowerCase());
-        }
-        elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getNameSpace(), config.getUser(), config.getPassword());
-
-        this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
-        this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
-        this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient));
-        this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearchClient));
-
-        this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEsDAO(elasticSearchClient));
-        this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient));
-        this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient));
-        this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient));
-
-        this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
-        this.registerServiceImplementation(IMetricQueryDAO.class, new MetricQueryEsDAO(elasticSearchClient));
-        this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient));
-        this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient));
-        this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
-        this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient));
-        this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient));
-    }
-
-    @Override
-    public void start() throws ModuleStartException {
-        try {
-            elasticSearchClient.connect();
-
-            StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber());
-            installer.install(elasticSearchClient);
-
-            RegisterLockInstaller lockInstaller = new RegisterLockInstaller(elasticSearchClient);
-            lockInstaller.install();
-        } catch (StorageException e) {
-            throw new ModuleStartException(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public void notifyAfterCompleted() {
-    }
-
-    @Override
-    public String[] requiredModules() {
-        return new String[] {CoreModule.NAME};
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java
new file mode 100644
index 0000000..e529fda
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinStorageModuleElasticsearchProvider.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchProvider;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ZipkinStorageModuleElasticsearchProvider extends StorageModuleElasticsearchProvider {
+
+    private static final Logger logger = LoggerFactory.getLogger(ZipkinStorageModuleElasticsearchProvider.class);
+    private ZipkinTraceQueryEsDAO traceQueryEsDAO;
+
+    @Override
+    public String name() {
+        return "zipkin-elasticsearch";
+    }
+
+    @Override
+    public void prepare() throws ServiceNotProvidedException {
+        super.prepare();
+        traceQueryEsDAO = new ZipkinTraceQueryEsDAO(elasticSearchClient);
+        this.registerServiceImplementation(ITraceQueryDAO.class, traceQueryEsDAO);
+    }
+
+    @Override public void notifyAfterCompleted() {
+        super.notifyAfterCompleted();
+        traceQueryEsDAO.setServiceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class));
+    }
+
+    @Override
+    public String[] requiredModules() {
+        return new String[] {CoreModule.NAME};
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
new file mode 100644
index 0000000..448e745
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch;
+
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.util.*;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.query.entity.*;
+import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.*;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.*;
+import org.elasticsearch.search.aggregations.bucket.terms.*;
+import org.elasticsearch.search.aggregations.metrics.max.Max;
+import org.elasticsearch.search.aggregations.metrics.min.Min;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import zipkin2.Span;
+import zipkin2.codec.SpanBytesDecoder;
+
+import static org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord.*;
+
+public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
+    @Setter
+    private ServiceInventoryCache serviceInventoryCache;
+
+    public ZipkinTraceQueryEsDAO(
+        ElasticSearchClient client) {
+        super(client);
+    }
+
+    @Override
+    public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration,
+        String endpointName, int serviceId, int serviceInstanceId, int endpointId, String traceId, int limit, int from,
+        TraceState traceState, QueryOrder queryOrder) throws IOException {
+
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+
+        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+        sourceBuilder.query(boolQueryBuilder);
+        List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
+
+        if (startSecondTB != 0 && endSecondTB != 0) {
+            mustQueryList.add(QueryBuilders.rangeQuery(TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
+        }
+
+        if (minDuration != 0 || maxDuration != 0) {
+            RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(LATENCY);
+            if (minDuration != 0) {
+                rangeQueryBuilder.gte(minDuration);
+            }
+            if (maxDuration != 0) {
+                rangeQueryBuilder.lte(maxDuration);
+            }
+            boolQueryBuilder.must().add(rangeQueryBuilder);
+        }
+        if (!Strings.isNullOrEmpty(endpointName)) {
+            mustQueryList.add(QueryBuilders.matchPhraseQuery(ENDPOINT_NAME, endpointName));
+        }
+        if (serviceId != 0) {
+            boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_ID, serviceId));
+        }
+        if (serviceInstanceId != 0) {
+            boolQueryBuilder.must().add(QueryBuilders.termQuery(SERVICE_INSTANCE_ID, serviceInstanceId));
+        }
+        if (endpointId != 0) {
+            boolQueryBuilder.must().add(QueryBuilders.termQuery(ENDPOINT_ID, endpointId));
+        }
+        if (!Strings.isNullOrEmpty(traceId)) {
+            boolQueryBuilder.must().add(QueryBuilders.termQuery(TRACE_ID, traceId));
+        }
+        switch (traceState) {
+            case ERROR:
+                mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.TRUE));
+                break;
+            case SUCCESS:
+                mustQueryList.add(QueryBuilders.matchQuery(IS_ERROR, BooleanUtils.FALSE));
+                break;
+        }
+
+        TermsAggregationBuilder builder = AggregationBuilders.terms(TRACE_ID).field(TRACE_ID).size(limit)
+            .subAggregation(
+                AggregationBuilders.max(LATENCY).field(LATENCY)
+            )
+            .subAggregation(
+                AggregationBuilders.min(START_TIME).field(START_TIME)
+            );
+        switch (queryOrder) {
+            case BY_START_TIME:
+                builder.order(BucketOrder.aggregation(START_TIME, false));
+                break;
+            case BY_DURATION:
+                builder.order(BucketOrder.aggregation(LATENCY, false));
+                break;
+        }
+        sourceBuilder.aggregation(builder);
+
+        SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder);
+
+        TraceBrief traceBrief = new TraceBrief();
+
+        Terms terms = response.getAggregations().get(TRACE_ID);
+
+        for (Terms.Bucket termsBucket : terms.getBuckets()) {
+            BasicTrace basicTrace = new BasicTrace();
+
+            basicTrace.setSegmentId(termsBucket.getKeyAsString());
+            Min startTime = termsBucket.getAggregations().get(START_TIME);
+            Max latency = termsBucket.getAggregations().get(LATENCY);
+            basicTrace.setStart(String.valueOf((long)startTime.getValue()));
+            basicTrace.getEndpointNames().add("");
+            basicTrace.setDuration((int)latency.getValue());
+            basicTrace.setError(false);
+            basicTrace.getTraceIds().add(termsBucket.getKeyAsString());
+            traceBrief.getTraces().add(basicTrace);
+        }
+
+        return traceBrief;
+    }
+
+    @Override public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override public List<org.apache.skywalking.oap.server.core.query.entity.Span> doFlexibleTraceQuery(
+        String traceId) throws IOException {
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.query(QueryBuilders.termQuery(TRACE_ID, traceId));
+        sourceBuilder.sort(START_TIME, SortOrder.ASC);
+        sourceBuilder.size(1000);
+
+        SearchResponse response = getClient().search(ZipkinSpanRecord.INDEX_NAME, sourceBuilder);
+
+        List<org.apache.skywalking.oap.server.core.query.entity.Span> spanList = new ArrayList<>();
+
+        boolean isFirst = true;
+        for (SearchHit searchHit : response.getHits().getHits()) {
+            int serviceId = ((Number)searchHit.getSourceAsMap().get(SERVICE_ID)).intValue();
+            String dataBinaryBase64 = (String)searchHit.getSourceAsMap().get(SegmentRecord.DATA_BINARY);
+            Span span = SpanBytesDecoder.PROTO3.decodeOne(Base64.getDecoder().decode(dataBinaryBase64));
+
+            org.apache.skywalking.oap.server.core.query.entity.Span swSpan = new org.apache.skywalking.oap.server.core.query.entity.Span();
+
+            swSpan.setTraceId(span.traceId());
+            swSpan.setEndpointName(span.name());
+            swSpan.setStartTime(span.timestamp() / 1000);
+            swSpan.setEndTime(swSpan.getStartTime() + span.durationAsLong() / 1000);
+            span.tags().forEach((key, value) -> {
+                swSpan.getTags().add(new KeyValue(key, value));
+            });
+            span.annotations().forEach(annotation -> {
+                LogEntity entity = new LogEntity();
+                entity.setTime(annotation.timestamp() / 1000);
+                entity.getData().add(new KeyValue("annotation", annotation.value()));
+                swSpan.getLogs().add(entity);
+            });
+            if (serviceId != Const.NONE) {
+                swSpan.setServiceCode(serviceInventoryCache.get(serviceId).getName());
+            }
+            swSpan.setSpanId(0);
+            swSpan.setParentSpanId(-1);
+            swSpan.setSegmentSpanId(span.id());
+            swSpan.setSegmentId(span.id());
+            Span.Kind kind = span.kind();
+            switch (kind) {
+                case CLIENT:
+                case PRODUCER:
+                    swSpan.setType("Entry");
+                    break;
+                case SERVER:
+                case CONSUMER:
+                    swSpan.setType("Exit");
+                    break;
+                default:
+                    swSpan.setType("Local");
+
+            }
+
+            if (isFirst) {
+                swSpan.setRoot(true);
+                swSpan.setSegmentParentSpanId("");
+                isFirst = false;
+            } else {
+                Ref ref = new Ref();
+                ref.setTraceId(span.traceId());
+                ref.setParentSegmentId(span.parentId());
+                ref.setType(RefType.CROSS_PROCESS);
+                ref.setParentSpanId(0);
+
+                swSpan.getRefs().add(ref);
+                swSpan.setSegmentParentSpanId(span.parentId());
+            }
+            spanList.add(swSpan);
+        }
+        return spanList;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
index 5f028bc..de8e186 100644
--- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
+++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -16,4 +16,4 @@
 #
 #
 
-org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch.StorageModuleElasticsearchProvider
\ No newline at end of file
+org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch.ZipkinStorageModuleElasticsearchProvider
\ No newline at end of file