You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/02/15 12:45:52 UTC

[skywalking] 01/01: Support day/hour/minute metrics merging into one index. Reduce the number of index 50%.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch es-idx-merging
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 6dee4fab6668a1e18a43debc543bafc2d0a2d87b
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sat Feb 15 20:45:39 2020 +0800

    Support day/hour/minute metrics merging into one index. Reduce the number of index 50%.
---
 dist-material/application.yml                      |   2 +
 .../src/main/resources/application.yml             |   2 +
 .../client/elasticsearch/ElasticSearchClient.java  |  89 ++++++-----
 .../client/elasticsearch/IndexNameConverter.java   |  26 ++++
 .../elasticsearch/ITElasticSearchClient.java       |  33 +++-
 .../server/receiver/trace/mock/AgentDataMock.java  | 105 +++++++++++++
 .../server/receiver/trace/mock/RegisterMock.java   | 113 ++++++++++++++
 .../server/receiver/trace/mock/ServiceAMock.java   | 118 +++++++++++++++
 .../server/receiver/trace/mock/ServiceBMock.java   | 166 +++++++++++++++++++++
 .../server/receiver/trace/mock/ServiceCMock.java   | 114 ++++++++++++++
 .../receiver/trace/mock/UniqueIdBuilder.java       |  39 +++++
 .../StorageModuleElasticsearchConfig.java          |  10 ++
 .../StorageModuleElasticsearchProvider.java        | 122 ++++++++++++---
 .../StorageModuleElasticsearch7Provider.java       |  71 +++++++--
 .../client/ElasticSearch7Client.java               |  65 +++++---
 15 files changed, 977 insertions(+), 98 deletions(-)

diff --git a/dist-material/application.yml b/dist-material/application.yml
index 25e165a..e6bad28 100644
--- a/dist-material/application.yml
+++ b/dist-material/application.yml
@@ -82,6 +82,7 @@ storage:
 #    protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
 #    trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
 #    trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
+#    enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
 #    user: ${SW_ES_USER:""}
 #    password: ${SW_ES_PASSWORD:""}
 #    indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
@@ -104,6 +105,7 @@ storage:
 #    protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
 #    trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
 #    trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
+#    enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
 #    user: ${SW_ES_USER:""}
 #    password: ${SW_ES_PASSWORD:""}
 #    indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index faaa7e3..9f06f78 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -83,6 +83,7 @@ storage:
 #    #trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
 #    user: ${SW_ES_USER:""}
 #    password: ${SW_ES_PASSWORD:""}
+#    enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
 #    indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
 #    indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
 #    # Those data TTL settings will override the same settings in core module.
@@ -104,6 +105,7 @@ storage:
     protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
     #trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
     #trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
+    enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
     user: ${SW_ES_USER:""}
     password: ${SW_ES_PASSWORD:""}
     indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 01abbeb..0f47abe 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -38,6 +38,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import javax.net.ssl.SSLContext;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
 import org.apache.http.HttpStatus;
@@ -83,30 +84,34 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+/**
+ * ElasticSearchClient connects to the ES server by using ES client APIs.
+ */
+@Slf4j
 public class ElasticSearchClient implements Client {
-
-    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchClient.class);
-
     public static final String TYPE = "type";
     protected final String clusterNodes;
     protected final String protocol;
     private final String trustStorePath;
     private final String trustStorePass;
-    private final String namespace;
     private final String user;
     private final String password;
+    private final List<IndexNameConverter> indexNameConverters;
     protected RestHighLevelClient client;
 
-    public ElasticSearchClient(String clusterNodes, String protocol, String trustStorePath, String trustStorePass,
-        String namespace, String user, String password) {
+    public ElasticSearchClient(String clusterNodes,
+                               String protocol,
+                               String trustStorePath,
+                               String trustStorePass,
+                               String user,
+                               String password,
+                               List<IndexNameConverter> indexNameConverters) {
         this.clusterNodes = clusterNodes;
         this.protocol = protocol;
-        this.namespace = namespace;
         this.user = user;
         this.password = password;
+        this.indexNameConverters = indexNameConverters;
         this.trustStorePath = trustStorePath;
         this.trustStorePass = trustStorePass;
     }
@@ -127,7 +132,9 @@ public class ElasticSearchClient implements Client {
 
             if (StringUtil.isEmpty(trustStorePath)) {
                 builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
-                                    .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+                                    .setHttpClientConfigCallback(
+                                        httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(
+                                            credentialsProvider));
             } else {
                 KeyStore truststore = KeyStore.getInstance("jks");
                 try (InputStream is = Files.newInputStream(Paths.get(trustStorePath))) {
@@ -136,8 +143,10 @@ public class ElasticSearchClient implements Client {
                 SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
                 final SSLContext sslContext = sslBuilder.build();
                 builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
-                                    .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
-                                                                                                       .setSSLContext(sslContext));
+                                    .setHttpClientConfigCallback(
+                                        httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(
+                                            credentialsProvider)
+                                                                              .setSSLContext(sslContext));
             }
         } else {
             builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
@@ -153,7 +162,7 @@ public class ElasticSearchClient implements Client {
 
     public static List<HttpHost> parseClusterNodes(String protocol, String nodes) {
         List<HttpHost> httpHosts = new LinkedList<>();
-        logger.info("elasticsearch cluster nodes: {}", nodes);
+        log.info("elasticsearch cluster nodes: {}", nodes);
         List<String> nodesSplit = Splitter.on(",").omitEmptyStrings().splitToList(nodes);
 
         for (String node : nodesSplit) {
@@ -170,19 +179,19 @@ public class ElasticSearchClient implements Client {
 
         CreateIndexRequest request = new CreateIndexRequest(indexName);
         CreateIndexResponse response = client.indices().create(request);
-        logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
+        log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
         return response.isAcknowledged();
     }
 
     public boolean createIndex(String indexName, Map<String, Object> settings,
-        Map<String, Object> mapping) throws IOException {
+                               Map<String, Object> mapping) throws IOException {
         indexName = formatIndexName(indexName);
         CreateIndexRequest request = new CreateIndexRequest(indexName);
         Gson gson = new Gson();
         request.settings(gson.toJson(settings), XContentType.JSON);
         request.mapping(TYPE, gson.toJson(mapping), XContentType.JSON);
         CreateIndexResponse response = client.indices().create(request);
-        logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
+        log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
         return response.isAcknowledged();
     }
 
@@ -193,7 +202,7 @@ public class ElasticSearchClient implements Client {
             Gson gson = new Gson();
             InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
             JsonObject responseJson = gson.fromJson(reader, JsonObject.class);
-            logger.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson);
+            log.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson);
             return new ArrayList<>(responseJson.keySet());
         }
         return Collections.emptyList();
@@ -227,7 +236,7 @@ public class ElasticSearchClient implements Client {
         DeleteIndexRequest request = new DeleteIndexRequest(indexName);
         DeleteIndexResponse response;
         response = client.indices().delete(request);
-        logger.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
+        log.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
         return response.isAcknowledged();
     }
 
@@ -249,12 +258,13 @@ public class ElasticSearchClient implements Client {
         } else if (statusCode == HttpStatus.SC_NOT_FOUND) {
             return false;
         } else {
-            throw new IOException("The response status code of template exists request should be 200 or 404, but it is " + statusCode);
+            throw new IOException(
+                "The response status code of template exists request should be 200 or 404, but it is " + statusCode);
         }
     }
 
     public boolean createTemplate(String indexName, Map<String, Object> settings,
-        Map<String, Object> mapping) throws IOException {
+                                  Map<String, Object> mapping) throws IOException {
         indexName = formatIndexName(indexName);
 
         String[] patterns = new String[] {indexName + "-*"};
@@ -271,7 +281,8 @@ public class ElasticSearchClient implements Client {
         HttpEntity entity = new NStringEntity(new Gson().toJson(template), ContentType.APPLICATION_JSON);
 
         Response response = client.getLowLevelClient()
-                                  .performRequest(HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
+                                  .performRequest(
+                                      HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
         return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
     }
 
@@ -313,14 +324,16 @@ public class ElasticSearchClient implements Client {
     }
 
     public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException {
-        org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
+        org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
+            indexName, id, source);
         request.version(version);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
         client.update(request);
     }
 
     public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
-        org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
+        org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
+            indexName, id, source);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
         client.update(request);
     }
@@ -341,8 +354,9 @@ public class ElasticSearchClient implements Client {
         String jsonString = "{" + "  \"query\": {" + "    \"range\": {" + "      \"" + timeBucketColumnName + "\": {" + "        \"lte\": " + endTimeBucket + "      }" + "    }" + "  }" + "}";
         HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
         Response response = client.getLowLevelClient()
-                                  .performRequest(HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query", params, entity);
-        logger.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
+                                  .performRequest(
+                                      HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query", params, entity);
+        log.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
         return response.getStatusLine().getStatusCode();
     }
 
@@ -353,9 +367,9 @@ public class ElasticSearchClient implements Client {
         try {
             int size = request.requests().size();
             BulkResponse responses = client.bulk(request);
-            logger.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
+            log.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
         } catch (IOException e) {
-            logger.error(e.getMessage(), e);
+            log.error(e.getMessage(), e);
         }
     }
 
@@ -375,31 +389,34 @@ public class ElasticSearchClient implements Client {
             @Override
             public void beforeBulk(long executionId, BulkRequest request) {
                 int numberOfActions = request.numberOfActions();
-                logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
+                log.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
             }
 
             @Override
             public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                 if (response.hasFailures()) {
-                    logger.warn("Bulk [{}] executed with failures", executionId);
+                    log.warn("Bulk [{}] executed with failures", executionId);
                 } else {
-                    logger.info("Bulk execution id [{}] completed in {} milliseconds, size: {}", executionId, response.getTook()
-                                                                                                                      .getMillis(), request
-                        .requests()
-                        .size());
+                    log.info(
+                        "Bulk execution id [{}] completed in {} milliseconds, size: {}", executionId, response.getTook()
+                                                                                                              .getMillis(),
+                        request
+                            .requests()
+                            .size()
+                    );
                 }
             }
 
             @Override
             public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-                logger.error("Failed to execute bulk", failure);
+                log.error("Failed to execute bulk", failure);
             }
         };
     }
 
     public String formatIndexName(String indexName) {
-        if (StringUtil.isNotEmpty(namespace)) {
-            return namespace + "_" + indexName;
+        for (final IndexNameConverter indexNameConverter : indexNameConverters) {
+            indexName = indexNameConverter.convert(indexName);
         }
         return indexName;
     }
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexNameConverter.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexNameConverter.java
new file mode 100644
index 0000000..faeca02
--- /dev/null
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexNameConverter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.elasticsearch;
+
+/**
+ * Implementation supports the ElasticSearch index name converting.
+ */
+public interface IndexNameConverter {
+    String convert(String name);
+}
diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
index 5f1abcc..c18bdff 100644
--- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
@@ -22,6 +22,7 @@ import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -67,7 +68,9 @@ public class ITElasticSearchClient {
     public void before() throws Exception {
         final String esAddress = System.getProperty("elastic.search.address");
         final String esProtocol = System.getProperty("elastic.search.protocol");
-        client = new ElasticSearchClient(esAddress, esProtocol, "", "", namespace, "test", "test");
+        client = new ElasticSearchClient(esAddress, esProtocol, "", "", "test", "test",
+                                         indexNameConverters(namespace)
+        );
         client.connect();
     }
 
@@ -270,12 +273,36 @@ public class ITElasticSearchClient {
                     index.add(oldIndexName.substring(namespacePrefix.length()), entry.getValue());
                     index.remove(oldIndexName);
                 } else {
-                    throw new RuntimeException("The indexName must contain the " + namespace + " prefix, but it is " + entry
-                        .getKey());
+                    throw new RuntimeException(
+                        "The indexName must contain the " + namespace + " prefix, but it is " + entry
+                            .getKey());
                 }
             });
             logger.info("UndoFormatIndexName after " + index.toString());
         }
         return index;
     }
+
+    private static List<IndexNameConverter> indexNameConverters(String namespace) {
+        List<IndexNameConverter> converters = new ArrayList<>();
+        converters.add(new NamespaceConverter(namespace));
+        return converters;
+    }
+
+    private static class NamespaceConverter implements IndexNameConverter {
+        private final String namespace;
+
+        public NamespaceConverter(final String namespace) {
+            this.namespace = namespace;
+        }
+
+        @Override
+        public String convert(final String indexName) {
+            if (StringUtil.isNotEmpty(namespace)) {
+                return namespace + "_" + indexName;
+            }
+
+            return indexName;
+        }
+    }
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
new file mode 100644
index 0000000..b53ca2c
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.apm.network.common.Commands;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
+import org.apache.skywalking.apm.network.language.agent.v2.TraceSegmentReportServiceGrpc;
+
+public class AgentDataMock {
+
+    private static boolean IS_COMPLETED = false;
+
+    public static void main(String[] args) throws InterruptedException {
+        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext().build();
+
+        RegisterMock registerMock = new RegisterMock(channel);
+
+        StreamObserver<UpstreamSegment> streamObserver = createStreamObserver();
+
+        UniqueId.Builder globalTraceId = UniqueIdBuilder.INSTANCE.create();
+        long startTimestamp = System.currentTimeMillis();
+        //long startTimestamp = new DateTime().minusDays(2).getMillis();
+
+        // ServiceAMock
+        ServiceAMock serviceAMock = new ServiceAMock(registerMock);
+        serviceAMock.register();
+
+        // ServiceBMock
+        ServiceBMock serviceBMock = new ServiceBMock(registerMock);
+        serviceBMock.register();
+
+        // ServiceCMock
+        ServiceCMock serviceCMock = new ServiceCMock(registerMock);
+        serviceCMock.register();
+
+        UniqueId.Builder serviceASegmentId = UniqueIdBuilder.INSTANCE.create();
+        serviceAMock.mock(streamObserver, globalTraceId, serviceASegmentId, startTimestamp, true);
+
+        UniqueId.Builder serviceBSegmentId = UniqueIdBuilder.INSTANCE.create();
+        serviceBMock.mock(streamObserver, globalTraceId, serviceBSegmentId, serviceASegmentId, startTimestamp, true);
+
+        UniqueId.Builder serviceCSegmentId = UniqueIdBuilder.INSTANCE.create();
+        serviceCMock.mock(streamObserver, globalTraceId, serviceCSegmentId, serviceBSegmentId, startTimestamp, true);
+
+        TimeUnit.SECONDS.sleep(10);
+
+        for (int i = 0; i < 500; i++) {
+            globalTraceId = UniqueIdBuilder.INSTANCE.create();
+            serviceASegmentId = UniqueIdBuilder.INSTANCE.create();
+            serviceBSegmentId = UniqueIdBuilder.INSTANCE.create();
+            serviceCSegmentId = UniqueIdBuilder.INSTANCE.create();
+            serviceAMock.mock(streamObserver, globalTraceId, serviceASegmentId, startTimestamp, true);
+            serviceBMock.mock(
+                streamObserver, globalTraceId, serviceBSegmentId, serviceASegmentId, startTimestamp, true);
+            serviceCMock.mock(
+                streamObserver, globalTraceId, serviceCSegmentId, serviceBSegmentId, startTimestamp, true);
+        }
+
+        streamObserver.onCompleted();
+        while (!IS_COMPLETED) {
+            TimeUnit.MILLISECONDS.sleep(500);
+        }
+    }
+
+    private static StreamObserver<UpstreamSegment> createStreamObserver() {
+        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext().build();
+        TraceSegmentReportServiceGrpc.TraceSegmentReportServiceStub stub = TraceSegmentReportServiceGrpc.newStub(
+            channel);
+        return stub.collect(new StreamObserver<Commands>() {
+            @Override
+            public void onNext(Commands downstream) {
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+            }
+
+            @Override
+            public void onCompleted() {
+                IS_COMPLETED = true;
+            }
+        });
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/RegisterMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/RegisterMock.java
new file mode 100644
index 0000000..ed9c94d
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/RegisterMock.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import io.grpc.ManagedChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.apm.network.common.KeyIntValuePair;
+import org.apache.skywalking.apm.network.common.KeyStringValuePair;
+import org.apache.skywalking.apm.network.common.ServiceType;
+import org.apache.skywalking.apm.network.register.v2.RegisterGrpc;
+import org.apache.skywalking.apm.network.register.v2.Service;
+import org.apache.skywalking.apm.network.register.v2.ServiceInstance;
+import org.apache.skywalking.apm.network.register.v2.ServiceInstanceRegisterMapping;
+import org.apache.skywalking.apm.network.register.v2.ServiceInstances;
+import org.apache.skywalking.apm.network.register.v2.ServiceRegisterMapping;
+import org.apache.skywalking.apm.network.register.v2.Services;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author peng-yongsheng
+ */
+class RegisterMock {
+
+    private static final Logger logger = LoggerFactory.getLogger(RegisterMock.class);
+
+    private final RegisterGrpc.RegisterBlockingStub registerStub;
+
+    RegisterMock(ManagedChannel channel) {
+        registerStub = RegisterGrpc.newBlockingStub(channel);
+    }
+
+    int registerService(String serviceName) throws InterruptedException {
+        Services.Builder services = Services.newBuilder();
+        services.addServices(Service
+                                 .newBuilder()
+                                 .setServiceName(serviceName)
+                                 .setType(ServiceType.normal))
+                .build();
+
+        ServiceRegisterMapping serviceRegisterMapping;
+        int serviceId = 0;
+        do {
+            serviceRegisterMapping = registerStub.doServiceRegister(services.build());
+
+            List<KeyIntValuePair> servicesList = serviceRegisterMapping.getServicesList();
+            if (servicesList.size() > 0) {
+                serviceId = servicesList.get(0).getValue();
+                logger.debug("service id: {}", serviceId);
+            }
+
+            TimeUnit.MILLISECONDS.sleep(20);
+        }
+        while (serviceId == 0);
+
+        return serviceId;
+    }
+
+    int registerServiceInstance(int serviceId, String agentName) throws InterruptedException {
+        ServiceInstances.Builder instances = ServiceInstances.newBuilder();
+
+        instances.addInstances(ServiceInstance.newBuilder()
+                                              .setServiceId(serviceId)
+                                              .setInstanceUUID(agentName)
+                                              .setTime(System.currentTimeMillis())
+                                              .addAllProperties(buildOSInfo())
+        );
+
+        ServiceInstanceRegisterMapping instanceMapping;
+        int instanceId = 0;
+        do {
+            instanceMapping = registerStub.doServiceInstanceRegister(instances.build());
+            List<KeyIntValuePair> serviceInstancesList = instanceMapping.getServiceInstancesList();
+            if (serviceInstancesList.size() > 0) {
+                instanceId = serviceInstancesList.get(0).getValue();
+                logger.debug("instance id: {}", instanceId);
+            }
+            TimeUnit.MILLISECONDS.sleep(20);
+        }
+        while (instanceId == 0);
+
+        return instanceId;
+    }
+
+    public static List<KeyStringValuePair> buildOSInfo() {
+        List<KeyStringValuePair> osInfo = new ArrayList<KeyStringValuePair>();
+
+        osInfo.add(KeyStringValuePair.newBuilder().setKey("os_name").setValue("osName").build());
+        osInfo.add(KeyStringValuePair.newBuilder().setKey("host_name").setValue("hostName").build());
+        osInfo.add(KeyStringValuePair.newBuilder().setKey("ipv4").setValue("ipv4").build());
+        osInfo.add(KeyStringValuePair.newBuilder().setKey("process_no").setValue("123").build());
+        osInfo.add(KeyStringValuePair.newBuilder().setKey("language").setValue("java").build());
+        return osInfo;
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceAMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceAMock.java
new file mode 100644
index 0000000..65ac83e
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceAMock.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.network.language.agent.*;
+import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
+import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+/**
+ * @author peng-yongsheng
+ */
+class ServiceAMock {
+
+    static String REST_ENDPOINT = "/dubbox-case/case/dubbox-rest";
+    static String DUBBO_ENDPOINT = "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()";
+    static String DUBBO_ADDRESS = "DubboIPAddress:1000";
+    private final RegisterMock registerMock;
+    private static int SERVICE_ID;
+    static int SERVICE_INSTANCE_ID;
+
+    ServiceAMock(RegisterMock registerMock) {
+        this.registerMock = registerMock;
+    }
+
+    void register() throws InterruptedException {
+        SERVICE_ID = registerMock.registerService("dubbox-consumer");
+        SERVICE_INSTANCE_ID = registerMock.registerServiceInstance(SERVICE_ID, "pengysA");
+    }
+
+    void mock(StreamObserver<UpstreamSegment> streamObserver, UniqueId.Builder traceId,
+        UniqueId.Builder segmentId, long startTimestamp, boolean isPrepare) {
+        UpstreamSegment.Builder upstreamSegment = UpstreamSegment.newBuilder();
+        upstreamSegment.addGlobalTraceIds(traceId);
+        upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, isPrepare));
+
+        streamObserver.onNext(upstreamSegment.build());
+    }
+
+    private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId, boolean isPrepare) {
+        SegmentObject.Builder segment = SegmentObject.newBuilder();
+        segment.setTraceSegmentId(segmentId);
+        segment.setServiceId(SERVICE_ID);
+        segment.setServiceInstanceId(SERVICE_INSTANCE_ID);
+        segment.addSpans(createEntrySpan(startTimestamp, isPrepare));
+        segment.addSpans(createLocalSpan(startTimestamp, isPrepare));
+        segment.addSpans(createExitSpan(startTimestamp, isPrepare));
+
+        return segment.build().toByteString();
+    }
+
+    private SpanObjectV2.Builder createEntrySpan(long startTimestamp, boolean isPrepare) {
+        SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
+        span.setSpanId(0);
+        span.setSpanType(SpanType.Entry);
+        span.setSpanLayer(SpanLayer.Http);
+        span.setParentSpanId(-1);
+        span.setStartTime(startTimestamp);
+        span.setEndTime(startTimestamp + 6000);
+        span.setComponentId(ComponentsDefine.TOMCAT.getId());
+        if (isPrepare) {
+            span.setOperationName(REST_ENDPOINT);
+        } else {
+            span.setOperationNameId(2);
+        }
+        span.setIsError(false);
+        return span;
+    }
+
+    private SpanObjectV2.Builder createLocalSpan(long startTimestamp, boolean isPrepare) {
+        SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
+        span.setSpanId(1);
+        span.setSpanType(SpanType.Local);
+        span.setParentSpanId(0);
+        span.setStartTime(startTimestamp + 100);
+        span.setEndTime(startTimestamp + 500);
+        span.setOperationName("org.apache.skywalking.Local.do");
+        span.setIsError(false);
+        return span;
+    }
+
+    private SpanObjectV2.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
+        SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
+        span.setSpanId(2);
+        span.setSpanType(SpanType.Exit);
+        span.setSpanLayer(SpanLayer.RPCFramework);
+        span.setParentSpanId(1);
+        span.setStartTime(startTimestamp + 120);
+        span.setEndTime(startTimestamp + 5800);
+        span.setComponentId(ComponentsDefine.DUBBO.getId());
+        span.setOperationName(DUBBO_ENDPOINT);
+        if (isPrepare) {
+            span.setPeer(DUBBO_ADDRESS);
+        } else {
+            span.setPeerId(2);
+        }
+        span.setIsError(false);
+        return span;
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
new file mode 100644
index 0000000..46aab9c
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.network.common.KeyStringValuePair;
+import org.apache.skywalking.apm.network.language.agent.RefType;
+import org.apache.skywalking.apm.network.language.agent.SpanLayer;
+import org.apache.skywalking.apm.network.language.agent.SpanType;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
+import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
+import org.apache.skywalking.apm.network.language.agent.v2.SegmentReference;
+import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+/**
+ * @author peng-yongsheng
+ */
+class ServiceBMock {
+
+    private final RegisterMock registerMock;
+    private static int SERVICE_ID;
+    static int SERVICE_INSTANCE_ID;
+    static String DUBBO_PROVIDER_ENDPOINT = "org.skywaking.apm.testcase.dubbo.services.GreetServiceImpl.doBusiness()";
+    static String ROCKET_MQ_ENDPOINT = "org.apache.skywalking.RocketMQ";
+    static String ROCKET_MQ_ADDRESS = "RocketMQAddress:2000";
+
+    ServiceBMock(RegisterMock registerMock) {
+        this.registerMock = registerMock;
+    }
+
+    void register() throws InterruptedException {
+        SERVICE_ID = registerMock.registerService("dubbox-provider");
+        SERVICE_INSTANCE_ID = registerMock.registerServiceInstance(SERVICE_ID, "pengysB");
+    }
+
+    void mock(StreamObserver<UpstreamSegment> streamObserver,
+              UniqueId.Builder traceId,
+              UniqueId.Builder segmentId,
+              UniqueId.Builder parentTraceSegmentId,
+              long startTimestamp,
+              boolean isPrepare) {
+        UpstreamSegment.Builder upstreamSegment = UpstreamSegment.newBuilder();
+        upstreamSegment.addGlobalTraceIds(traceId);
+        upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, parentTraceSegmentId, isPrepare));
+
+        streamObserver.onNext(upstreamSegment.build());
+    }
+
+    private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId,
+                                     UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
+        SegmentObject.Builder segment = SegmentObject.newBuilder();
+        segment.setTraceSegmentId(segmentId);
+        segment.setServiceId(SERVICE_ID);
+        segment.setServiceInstanceId(SERVICE_INSTANCE_ID);
+        segment.addSpans(createEntrySpan(startTimestamp, parentTraceSegmentId, isPrepare));
+        segment.addSpans(createExitSpan(startTimestamp, isPrepare));
+        segment.addSpans(createMQExitSpan(startTimestamp, isPrepare));
+
+        return segment.build().toByteString();
+    }
+
+    private SegmentReference.Builder createReference(UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
+        SegmentReference.Builder reference = SegmentReference.newBuilder();
+        reference.setParentTraceSegmentId(parentTraceSegmentId);
+        reference.setParentServiceInstanceId(ServiceAMock.SERVICE_INSTANCE_ID);
+        reference.setParentSpanId(2);
+        reference.setEntryServiceInstanceId(ServiceAMock.SERVICE_INSTANCE_ID);
+        reference.setRefType(RefType.CrossProcess);
+
+        if (isPrepare) {
+            reference.setParentEndpoint(ServiceAMock.REST_ENDPOINT);
+            reference.setNetworkAddress(ServiceAMock.DUBBO_ADDRESS);
+            reference.setEntryEndpoint(ServiceAMock.REST_ENDPOINT);
+        } else {
+            reference.setParentEndpointId(2);
+            reference.setNetworkAddressId(2);
+            reference.setEntryEndpointId(2);
+        }
+        return reference;
+    }
+
+    private SpanObjectV2.Builder createEntrySpan(long startTimestamp, UniqueId.Builder uniqueId, boolean isPrepare) {
+        SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
+        span.setSpanId(0);
+        span.setSpanType(SpanType.Entry);
+        span.setSpanLayer(SpanLayer.RPCFramework);
+        span.setParentSpanId(-1);
+        span.setStartTime(startTimestamp + 500);
+        span.setEndTime(startTimestamp + 5000);
+        span.setComponentId(ComponentsDefine.DUBBO.getId());
+        span.setIsError(false);
+        span.addRefs(createReference(uniqueId, isPrepare));
+
+        if (isPrepare) {
+            span.setOperationName(ServiceBMock.DUBBO_PROVIDER_ENDPOINT);
+        } else {
+            span.setOperationNameId(4);
+        }
+        return span;
+    }
+
+    private SpanObjectV2.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
+        SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
+        span.setSpanId(1);
+        span.setSpanType(SpanType.Exit);
+        span.setSpanLayer(SpanLayer.Database);
+        span.setParentSpanId(0);
+        span.setStartTime(startTimestamp + 550);
+        span.setEndTime(startTimestamp + 1500);
+        span.setComponentId(ComponentsDefine.MONGO_DRIVER.getId());
+        span.setIsError(true);
+        span.addTags(KeyStringValuePair.newBuilder()
+                                       .setKey("db.statement")
+                                       .setValue("select * from database where complex = 1;")
+                                       .build());
+        span.addTags(KeyStringValuePair.newBuilder().setKey("db.type").setValue("mongodb").build());
+
+        span.setOperationName(
+            "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]");
+        if (isPrepare) {
+            span.setPeer("localhost:27017");
+        } else {
+            span.setPeerId(3);
+        }
+        return span;
+    }
+
+    private SpanObjectV2.Builder createMQExitSpan(long startTimestamp, boolean isPrepare) {
+        SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
+        span.setSpanId(2);
+        span.setSpanType(SpanType.Exit);
+        span.setSpanLayer(SpanLayer.MQ);
+        span.setParentSpanId(1);
+        span.setStartTime(startTimestamp + 1100);
+        span.setEndTime(startTimestamp + 1500);
+        span.setComponentId(ComponentsDefine.ROCKET_MQ_PRODUCER.getId());
+        span.setIsError(false);
+
+        span.setOperationName(ROCKET_MQ_ENDPOINT);
+        if (isPrepare) {
+            span.setPeer(ROCKET_MQ_ADDRESS);
+        } else {
+            span.setPeerId(4);
+        }
+        return span;
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceCMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceCMock.java
new file mode 100644
index 0000000..17897c8
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceCMock.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.network.language.agent.RefType;
+import org.apache.skywalking.apm.network.language.agent.SpanLayer;
+import org.apache.skywalking.apm.network.language.agent.SpanType;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
+import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
+import org.apache.skywalking.apm.network.language.agent.v2.SegmentReference;
+import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+/**
+ * @author peng-yongsheng
+ */
+class ServiceCMock {
+
+    private final RegisterMock registerMock;
+    private static int SERVICE_ID;
+    private static int SERVICE_INSTANCE_ID;
+
+    ServiceCMock(RegisterMock registerMock) {
+        this.registerMock = registerMock;
+    }
+
+    void register() throws InterruptedException {
+        SERVICE_ID = registerMock.registerService("rocket-mq-consumer");
+        SERVICE_INSTANCE_ID = registerMock.registerServiceInstance(SERVICE_ID, "pengysC");
+    }
+
+    void mock(StreamObserver<UpstreamSegment> streamObserver,
+              UniqueId.Builder traceId,
+              UniqueId.Builder segmentId,
+              UniqueId.Builder parentTraceSegmentId,
+              long startTimestamp,
+              boolean isPrepare) {
+        UpstreamSegment.Builder upstreamSegment = UpstreamSegment.newBuilder();
+        upstreamSegment.addGlobalTraceIds(traceId);
+        upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, parentTraceSegmentId, isPrepare));
+
+        streamObserver.onNext(upstreamSegment.build());
+    }
+
+    private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId,
+                                     UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
+        SegmentObject.Builder segment = SegmentObject.newBuilder();
+        segment.setTraceSegmentId(segmentId);
+        segment.setServiceInstanceId(SERVICE_INSTANCE_ID);
+        segment.setServiceId(SERVICE_ID);
+        segment.addSpans(createEntrySpan(startTimestamp, parentTraceSegmentId, isPrepare));
+
+        return segment.build().toByteString();
+    }
+
+    private SpanObjectV2.Builder createEntrySpan(long startTimestamp, UniqueId.Builder uniqueId, boolean isPrepare) {
+        SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
+        span.setSpanId(0);
+        span.setSpanType(SpanType.Entry);
+        span.setSpanLayer(SpanLayer.MQ);
+        span.setParentSpanId(-1);
+        span.setStartTime(startTimestamp + 3000);
+        span.setEndTime(startTimestamp + 5000);
+        span.setComponentId(ComponentsDefine.ROCKET_MQ_CONSUMER.getId());
+        span.setIsError(false);
+        span.addRefs(createReference(uniqueId, isPrepare));
+
+        if (isPrepare) {
+            span.setOperationName(ServiceBMock.ROCKET_MQ_ENDPOINT);
+        } else {
+            span.setOperationNameId(5);
+        }
+        return span;
+    }
+
+    private SegmentReference.Builder createReference(UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
+        SegmentReference.Builder reference = SegmentReference.newBuilder();
+        reference.setParentTraceSegmentId(parentTraceSegmentId);
+        reference.setParentServiceInstanceId(ServiceBMock.SERVICE_INSTANCE_ID);
+        reference.setParentSpanId(2);
+        reference.setEntryServiceInstanceId(ServiceAMock.SERVICE_INSTANCE_ID);
+        reference.setRefType(RefType.CrossProcess);
+
+        if (isPrepare) {
+            reference.setParentEndpoint(ServiceBMock.DUBBO_PROVIDER_ENDPOINT);
+            reference.setNetworkAddress(ServiceBMock.ROCKET_MQ_ADDRESS);
+            reference.setEntryEndpoint(ServiceAMock.REST_ENDPOINT);
+        } else {
+            reference.setParentEndpointId(8);
+            reference.setNetworkAddressId(4);
+            reference.setEntryEndpointId(2);
+        }
+        return reference;
+    }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/UniqueIdBuilder.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/UniqueIdBuilder.java
new file mode 100644
index 0000000..e64adff
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/UniqueIdBuilder.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.mock;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+
+/**
+ * @author peng-yongsheng
+ */
+public enum UniqueIdBuilder {
+    INSTANCE;
+
+    private AtomicLong idPart = new AtomicLong(1);
+
+    UniqueId.Builder create() {
+        UniqueId.Builder uniqueId = UniqueId.newBuilder();
+        uniqueId.addIdParts(idPart.getAndIncrement());
+        uniqueId.addIdParts(idPart.getAndIncrement());
+        uniqueId.addIdParts(idPart.getAndIncrement());
+        return uniqueId;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index dd7de6a..2035bec 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -55,6 +55,16 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
     @Getter
     @Setter
     String trustStorePass;
+    /**
+     * If this is ON, downsampling indexes(hour and day precisions) merged into minute precision. In this case, only
+     * {@link #minuteMetricsDataTTL} works for minute, hour and day.
+     *
+     * @since 7.0.0 This is an enhancement. Reduce 50% of index number(remove day/hour index requirements) but keep the
+     * performance nearly same as before. Only one side-effect for 6.x storage is just day/hour indexes remain, users
+     * need to remove them manually.
+     */
+    @Getter
+    private boolean enablePackedDownsampling = true;
     @Setter
     private int resultWindowMaxSize = 10000;
     @Setter
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index eaf115e..8983431 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -23,8 +23,14 @@ import java.security.KeyManagementException;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 import java.security.cert.CertificateException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.Downsampling;
 import org.apache.skywalking.oap.server.core.config.ConfigService;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
@@ -48,6 +54,7 @@ import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
 import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
 import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.IndexNameConverter;
 import org.apache.skywalking.oap.server.library.module.ModuleConfig;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 import org.apache.skywalking.oap.server.library.module.ModuleProvider;
@@ -106,38 +113,54 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
         if (!StringUtil.isEmpty(config.getNameSpace())) {
             config.setNameSpace(config.getNameSpace().toLowerCase());
         }
-        elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
-            .getTrustStorePass(), config.getNameSpace(), config.getUser(), config.getPassword());
+        elasticSearchClient = new ElasticSearchClient(
+            config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
+            .getTrustStorePass(), config.getUser(), config.getPassword(),
+            indexNameConverters(config.getNameSpace(), config.isEnablePackedDownsampling())
+        );
 
-        this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config
-            .getFlushInterval(), config.getConcurrentRequests()));
+        this.registerServiceImplementation(
+            IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config
+                .getFlushInterval(), config.getConcurrentRequests()));
         this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
         this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient));
-        this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearchClient, new ElasticsearchStorageTTL()));
+        this.registerServiceImplementation(
+            IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearchClient,
+                                                            new ElasticsearchStorageTTL()
+            ));
 
-        this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEsDAO(elasticSearchClient, config
-            .getResultWindowMaxSize()));
-        this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient, config
-            .getResultWindowMaxSize()));
-        this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient));
-        this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient, config
-            .getResultWindowMaxSize()));
+        this.registerServiceImplementation(
+            IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEsDAO(elasticSearchClient, config
+                .getResultWindowMaxSize()));
+        this.registerServiceImplementation(
+            IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient, config
+                .getResultWindowMaxSize()));
+        this.registerServiceImplementation(
+            IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient));
+        this.registerServiceImplementation(
+            INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient, config
+                .getResultWindowMaxSize()));
 
         this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEsDAO(elasticSearchClient));
-        this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient, config.getSegmentQueryMaxSize()));
-        this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
+        this.registerServiceImplementation(
+            ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient, config.getSegmentQueryMaxSize()));
+        this.registerServiceImplementation(
+            IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
         this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEsDAO(elasticSearchClient));
 
-        this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearchClient, config
-            .getProfileTaskQueryMaxSize()));
-        this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearchClient, config
-            .getProfileTaskQueryMaxSize()));
-        this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEsDAO(elasticSearchClient, config
-            .getProfileTaskQueryMaxSize()));
+        this.registerServiceImplementation(
+            IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearchClient, config
+                .getProfileTaskQueryMaxSize()));
+        this.registerServiceImplementation(
+            IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearchClient, config
+                .getProfileTaskQueryMaxSize()));
+        this.registerServiceImplementation(
+            IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEsDAO(elasticSearchClient, config
+                .getProfileTaskQueryMaxSize()));
     }
 
     @Override
@@ -174,4 +197,63 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
         configService.getDataTTLConfig().setDayMetricsDataTTL(config.getDayMetricsDataTTL());
         configService.getDataTTLConfig().setMonthMetricsDataTTL(config.getMonthMetricsDataTTL());
     }
+
+    public static List<IndexNameConverter> indexNameConverters(String namespace, boolean enablePackedDownsampling) {
+        List<IndexNameConverter> converters = new ArrayList<>();
+
+        if (enablePackedDownsampling) {
+            // Packed downsampling converter.
+            converters.add(new PackedDownsamplingConverter(enablePackedDownsampling));
+            converters.add(new NamespaceConverter(namespace));
+        }
+        return converters;
+    }
+
+    private static class PackedDownsamplingConverter implements IndexNameConverter {
+        private final boolean enablePackedDownsampling;
+        private final String[] removableSuffixes = new String[] {
+            Const.ID_SPLIT + Downsampling.Day.getName(),
+            Const.ID_SPLIT + Downsampling.Hour.getName()
+        };
+        private final Map<String, String> convertedIndexNames = new ConcurrentHashMap<>();
+
+        public PackedDownsamplingConverter(final boolean enablePackedDownsampling) {
+            this.enablePackedDownsampling = enablePackedDownsampling;
+        }
+
+        @Override
+        public String convert(final String indexName) {
+            String convertedName = convertedIndexNames.get(indexName);
+            if (convertedName != null) {
+                return convertedName;
+            }
+            convertedName = indexName;
+            for (final String removableSuffix : removableSuffixes) {
+                String mayReplaced = indexName.replaceAll(removableSuffix, "");
+                if (mayReplaced.length() != convertedName.length()) {
+                    convertedName = mayReplaced;
+                    break;
+                }
+            }
+            convertedIndexNames.put(indexName, convertedName);
+            return convertedName;
+        }
+    }
+
+    private static class NamespaceConverter implements IndexNameConverter {
+        private final String namespace;
+
+        public NamespaceConverter(final String namespace) {
+            this.namespace = namespace;
+        }
+
+        @Override
+        public String convert(final String indexName) {
+            if (StringUtil.isNotEmpty(namespace)) {
+                return namespace + "_" + indexName;
+            }
+
+            return indexName;
+        }
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
index 377ab26..99a96c4 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
@@ -76,6 +76,8 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.Metr
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.ProfileThreadSnapshotQueryEs7DAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.TraceQueryEs7DAO;
 
+import static org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchProvider.indexNameConverters;
+
 public class StorageModuleElasticsearch7Provider extends ModuleProvider {
 
     protected final StorageModuleElasticsearch7Config config;
@@ -106,31 +108,68 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
         if (!StringUtil.isEmpty(config.getNameSpace())) {
             config.setNameSpace(config.getNameSpace().toLowerCase());
         }
-        elasticSearch7Client = new ElasticSearch7Client(config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
-            .getTrustStorePass(), config.getNameSpace(), config.getUser(), config.getPassword());
-
-        this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests()));
+        elasticSearch7Client = new ElasticSearch7Client(
+            config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
+            .getTrustStorePass(), config.getUser(), config.getPassword(),
+            indexNameConverters(config.getNameSpace(), config.isEnablePackedDownsampling())
+        );
+
+        this.registerServiceImplementation(
+            IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(),
+                                                   config.getFlushInterval(), config.getConcurrentRequests()
+            ));
         this.registerServiceImplementation(StorageDAO.class, new StorageEs7DAO(elasticSearch7Client));
         this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockEs77DAOImpl(elasticSearch7Client));
-        this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearch7Client, new ElasticsearchStorageTTL()));
-
-        this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
-        this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
-        this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEs7DAO(elasticSearch7Client));
-        this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
+        this.registerServiceImplementation(
+            IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearch7Client,
+                                                            new ElasticsearchStorageTTL()
+            ));
+
+        this.registerServiceImplementation(
+            IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEs7DAO(
+                elasticSearch7Client,
+                config.getResultWindowMaxSize()
+            ));
+        this.registerServiceImplementation(
+            IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheEs7DAO(
+                elasticSearch7Client,
+                config.getResultWindowMaxSize()
+            ));
+        this.registerServiceImplementation(
+            IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEs7DAO(elasticSearch7Client));
+        this.registerServiceImplementation(
+            INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEs7DAO(
+                elasticSearch7Client,
+                config.getResultWindowMaxSize()
+            ));
 
         this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearch7Client));
         this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEs7DAO(elasticSearch7Client));
-        this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEs7DAO(elasticSearch7Client, config.getSegmentQueryMaxSize()));
-        this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEs7DAO(elasticSearch7Client, config.getMetadataQueryMaxSize()));
-        this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEs7DAO(elasticSearch7Client));
+        this.registerServiceImplementation(
+            ITraceQueryDAO.class, new TraceQueryEs7DAO(elasticSearch7Client, config.getSegmentQueryMaxSize()));
+        this.registerServiceImplementation(
+            IMetadataQueryDAO.class, new MetadataQueryEs7DAO(elasticSearch7Client, config.getMetadataQueryMaxSize()));
+        this.registerServiceImplementation(
+            IAggregationQueryDAO.class, new AggregationQueryEs7DAO(elasticSearch7Client));
         this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEs7DAO(elasticSearch7Client));
         this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearch7Client));
         this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEs7DAO(elasticSearch7Client));
 
-        this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearch7Client, config.getProfileTaskQueryMaxSize()));
-        this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearch7Client, config.getProfileTaskQueryMaxSize()));
-        this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEs7DAO(elasticSearch7Client, config.getProfileTaskQueryMaxSize()));
+        this.registerServiceImplementation(
+            IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(
+                elasticSearch7Client,
+                config.getProfileTaskQueryMaxSize()
+            ));
+        this.registerServiceImplementation(
+            IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(
+                elasticSearch7Client,
+                config.getProfileTaskQueryMaxSize()
+            ));
+        this.registerServiceImplementation(
+            IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEs7DAO(
+                elasticSearch7Client,
+                config.getProfileTaskQueryMaxSize()
+            ));
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java
index cd2003b..87b62d8 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java
@@ -18,9 +18,19 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.client;
 
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import org.apache.http.HttpHost;
 import org.apache.http.HttpStatus;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.IndexNameConverter;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
 import org.elasticsearch.action.admin.indices.alias.Alias;
@@ -55,23 +65,21 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.cert.CertificateException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
 public class ElasticSearch7Client extends ElasticSearchClient {
 
     private static final Logger logger = LoggerFactory.getLogger(ElasticSearch7Client.class);
 
-    public ElasticSearch7Client(final String clusterNodes, final String protocol, final String trustStorePath,
-        final String trustStorePass, final String namespace, final String user, final String password) {
-        super(clusterNodes, protocol, trustStorePath, trustStorePass, namespace, user, password);
+    public ElasticSearch7Client(final String clusterNodes,
+                                final String protocol,
+                                final String trustStorePath,
+                                final String trustStorePass,
+                                final String user,
+                                final String password,
+                                List<IndexNameConverter> indexNameConverters) {
+        super(
+            clusterNodes, protocol, trustStorePath, trustStorePass, user, password,
+            indexNameConverters
+        );
     }
 
     @Override
@@ -91,7 +99,7 @@ public class ElasticSearch7Client extends ElasticSearchClient {
     }
 
     public boolean createIndex(String indexName, Map<String, Object> settings,
-        Map<String, Object> mapping) throws IOException {
+                               Map<String, Object> mapping) throws IOException {
         indexName = formatIndexName(indexName);
         CreateIndexRequest request = new CreateIndexRequest(indexName);
         request.settings(settings);
@@ -138,11 +146,13 @@ public class ElasticSearch7Client extends ElasticSearchClient {
     }
 
     public boolean createTemplate(String indexName, Map<String, Object> settings,
-        Map<String, Object> mapping) throws IOException {
+                                  Map<String, Object> mapping) throws IOException {
         indexName = formatIndexName(indexName);
 
-        PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest(indexName).patterns(Collections.singletonList(indexName + "-*"))
-                                                                                                .alias(new Alias(indexName))
+        PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest(indexName).patterns(
+            Collections.singletonList(indexName + "-*"))
+                                                                                                .alias(new Alias(
+                                                                                                    indexName))
                                                                                                 .settings(settings)
                                                                                                 .mapping(mapping);
 
@@ -157,7 +167,8 @@ public class ElasticSearch7Client extends ElasticSearchClient {
 
         DeleteIndexTemplateRequest deleteIndexTemplateRequest = new DeleteIndexTemplateRequest(indexName);
         AcknowledgedResponse acknowledgedResponse = client.indices()
-                                                          .deleteTemplate(deleteIndexTemplateRequest, RequestOptions.DEFAULT);
+                                                          .deleteTemplate(
+                                                              deleteIndexTemplateRequest, RequestOptions.DEFAULT);
 
         return acknowledgedResponse.isAcknowledged();
     }
@@ -190,8 +201,9 @@ public class ElasticSearch7Client extends ElasticSearchClient {
     }
 
     public void forceUpdate(String indexName, String id, XContentBuilder source, long seqNo,
-        long primaryTerm) throws IOException {
-        org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
+                            long primaryTerm) throws IOException {
+        org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
+            indexName, id, source);
         request.setIfSeqNo(seqNo);
         request.setIfPrimaryTerm(primaryTerm);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
@@ -199,7 +211,8 @@ public class ElasticSearch7Client extends ElasticSearchClient {
     }
 
     public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
-        org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
+        org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
+            indexName, id, source);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
         client.update(request, RequestOptions.DEFAULT);
     }
@@ -221,7 +234,10 @@ public class ElasticSearch7Client extends ElasticSearchClient {
         deleteByQueryRequest.setAbortOnVersionConflict(false);
         deleteByQueryRequest.setQuery(QueryBuilders.rangeQuery(timeBucketColumnName).lte(endTimeBucket));
         BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
-        logger.debug("delete indexName: {}, by query request: {}, response: {}", indexName, deleteByQueryRequest, bulkByScrollResponse);
+        logger.debug(
+            "delete indexName: {}, by query request: {}, response: {}", indexName, deleteByQueryRequest,
+            bulkByScrollResponse
+        );
         return HttpStatus.SC_OK;
     }
 
@@ -241,7 +257,10 @@ public class ElasticSearch7Client extends ElasticSearchClient {
     public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) {
         BulkProcessor.Listener listener = createBulkListener();
 
-        return BulkProcessor.builder((bulkRequest, bulkResponseActionListener) -> client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener), listener)
+        return BulkProcessor.builder(
+            (bulkRequest, bulkResponseActionListener) -> client.bulkAsync(bulkRequest, RequestOptions.DEFAULT,
+                                                                          bulkResponseActionListener
+            ), listener)
                             .setBulkActions(bulkActions)
                             .setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
                             .setConcurrentRequests(concurrentRequests)