You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/04/19 12:11:15 UTC

[skywalking] branch master updated: Upgrade the InfluxDB storage-plugin to protocol V3 (#4641)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 28530cd  Upgrade the InfluxDB storage-plugin to protocol V3 (#4641)
28530cd is described below

commit 28530cd79d6d91443c0584fade1cbb79e137a8ed
Author: Daming <zt...@foxmail.com>
AuthorDate: Sun Apr 19 20:10:59 2020 +0800

    Upgrade the InfluxDB storage-plugin to protocol V3 (#4641)
    
    
    Co-authored-by: 吴晟 Wu Sheng <wu...@foxmail.com>
    Co-authored-by: kezhenxu94 <ke...@163.com>
    Co-authored-by: kezhenxu94 <ke...@apache.org>
---
 .github/workflows/e2e.cluster.yaml                 |   2 +-
 .github/workflows/e2e.profiling.yaml               |   2 +-
 .github/workflows/e2e.storages.yaml                |   2 +-
 .github/workflows/e2e.ttl.yaml                     |   2 +-
 .github/workflows/plugins-test.1.yaml              |   1 -
 .github/workflows/plugins-test.3.yaml              |   1 +
 .../storage/plugin/influxdb/InfluxClient.java      |  20 +-
 ...luxModelConstants.java => InfluxConstants.java} |  32 ++-
 .../plugin/influxdb/InfluxStorageProvider.java     |  48 ++--
 ...AddressAlias.java => InfluxTableInstaller.java} |  28 +-
 .../storage/plugin/influxdb/TableMetaInfo.java     |  68 ++++-
 .../plugin/influxdb/base/InfluxInsertRequest.java  |  14 +-
 .../storage/plugin/influxdb/base/MetricsDAO.java   |  44 ++--
 .../plugin/influxdb/base/NoneStreamDAO.java        |  21 +-
 .../storage/plugin/influxdb/base/RecordDAO.java    |  16 +-
 .../plugin/influxdb/query/AggregationQuery.java    |  18 +-
 .../influxdb/query/InfluxMetadataQueryDAO.java     | 146 -----------
 .../storage/plugin/influxdb/query/LogQuery.java    |  16 +-
 .../plugin/influxdb/query/MetadataQuery.java       | 290 +++++++++++++++++++++
 .../plugin/influxdb/query/MetricsQuery.java        |  40 +--
 .../influxdb/query/NetworkAddressAliasDAO.java     |  86 ++++++
 .../plugin/influxdb/query/ProfileTaskLogQuery.java |  27 +-
 .../plugin/influxdb/query/ProfileTaskQuery.java    |  51 ++--
 .../influxdb/query/ProfileThreadSnapshotQuery.java |  28 +-
 .../plugin/influxdb/query/TopNRecordsQuery.java    |   8 +-
 .../plugin/influxdb/query/TopologyQuery.java       | 148 ++++++-----
 .../storage/plugin/influxdb/query/TraceQuery.java  |  10 +-
 .../docker/profile/docker-compose.influxdb.yml     |   4 -
 test/plugin/run.sh                                 |   5 +-
 .../scenarios/mysql-scenario/support-version.list  |  11 +-
 .../scenarios/solrj-7.x-scenario/configuration.yml |   1 +
 31 files changed, 782 insertions(+), 408 deletions(-)

diff --git a/.github/workflows/e2e.cluster.yaml b/.github/workflows/e2e.cluster.yaml
index b1eb89f..d2f82f2 100644
--- a/.github/workflows/e2e.cluster.yaml
+++ b/.github/workflows/e2e.cluster.yaml
@@ -37,7 +37,7 @@ jobs:
     strategy:
       matrix:
         coordinator: ['zk']
-        storage: ['mysql', 'es6', 'es7'] #TODO: 'influxdb'
+        storage: ['mysql', 'es6', 'es7', 'influxdb']
     env:
       SW_COORDINATOR: ${{ matrix.coordinator }}
       SW_STORAGE: ${{ matrix.storage }}
diff --git a/.github/workflows/e2e.profiling.yaml b/.github/workflows/e2e.profiling.yaml
index 04de176..229468d 100644
--- a/.github/workflows/e2e.profiling.yaml
+++ b/.github/workflows/e2e.profiling.yaml
@@ -36,7 +36,7 @@ jobs:
     timeout-minutes: 90
     strategy:
       matrix:
-        storage: ['h2', 'mysql', 'es6', 'es7'] #TODO: 'influxdb'
+        storage: ['h2', 'mysql', 'es6', 'es7', 'influxdb']
     env:
       SW_STORAGE: ${{ matrix.storage }}
     steps:
diff --git a/.github/workflows/e2e.storages.yaml b/.github/workflows/e2e.storages.yaml
index c346f4d..c5c380b 100644
--- a/.github/workflows/e2e.storages.yaml
+++ b/.github/workflows/e2e.storages.yaml
@@ -36,7 +36,7 @@ jobs:
     timeout-minutes: 90
     strategy:
       matrix:
-        storage: ['mysql', 'es6', 'es7'] #TODO: 'influxdb'
+        storage: ['mysql', 'es6', 'es7', 'influxdb']
     env:
       SW_STORAGE: ${{ matrix.storage }}
     steps:
diff --git a/.github/workflows/e2e.ttl.yaml b/.github/workflows/e2e.ttl.yaml
index 8bce3fa..c1dce82 100644
--- a/.github/workflows/e2e.ttl.yaml
+++ b/.github/workflows/e2e.ttl.yaml
@@ -36,7 +36,7 @@ jobs:
     timeout-minutes: 90
     strategy:
       matrix:
-        storage: ['es6', 'es7'] #TODO: 'influxdb'
+        storage: ['es6', 'es7', 'influxdb']
     env:
       SW_STORAGE: ${{ matrix.storage }}
     steps:
diff --git a/.github/workflows/plugins-test.1.yaml b/.github/workflows/plugins-test.1.yaml
index b9fc6c2..a9a46b7 100644
--- a/.github/workflows/plugins-test.1.yaml
+++ b/.github/workflows/plugins-test.1.yaml
@@ -46,7 +46,6 @@ jobs:
           - { name: 'kotlin-coroutine-scenario', title: 'Kotlin Coroutine 1.0.1-1.3.3 (4)' }
           - { name: 'lettuce-scenario', title: 'Lettuce 5.x (17)' }
           - { name: 'mongodb-3.x-scenario', title: 'Mongodb 3.4.0-3.11.1 (22)' }
-          - { name: 'mysql-scenario', title: 'MySQL 5.1.2-8.0.15 (53)' }
           - { name: 'netty-socketio-scenario', title: 'Netty-SocketIO 1.x (4)' }
     steps:
       - uses: actions/checkout@v2
diff --git a/.github/workflows/plugins-test.3.yaml b/.github/workflows/plugins-test.3.yaml
index 4870180..80af4c7 100644
--- a/.github/workflows/plugins-test.3.yaml
+++ b/.github/workflows/plugins-test.3.yaml
@@ -33,6 +33,7 @@ jobs:
       fail-fast: true
       matrix:
         case:
+          - { name: 'mysql-scenario', title: 'MySQL 5.1.2-8.0.15 (30)' }
           - { name: 'undertow-scenario', title: 'Undertow 1.3.0-2.0.27 (23)' }
           - { name: 'webflux-scenario', title: 'Spring-WebFlux 2.x (7)' }
           - { name: 'zookeeper-scenario', title: 'Zookeeper 3.4.x (14)' }
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java
index cc4c06e..fe96dd4 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.influxdb;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import okhttp3.OkHttpClient;
@@ -73,6 +74,7 @@ public class InfluxClient implements Client {
                                          InfluxDB.ResponseFormat.MSGPACK
         );
         influx.query(new Query("CREATE DATABASE " + database));
+        influx.enableGzip();
 
         influx.enableBatch(config.getActions(), config.getDuration(), TimeUnit.MILLISECONDS);
         influx.setDatabase(database);
@@ -99,7 +101,7 @@ public class InfluxClient implements Client {
         }
 
         try {
-            QueryResult result = getInflux().query(query);
+            QueryResult result = getInflux().query(new Query(query.getCommand()));
             if (result.hasError()) {
                 throw new IOException(result.getError());
             }
@@ -137,6 +139,22 @@ public class InfluxClient implements Client {
     }
 
     /**
+     * Execute a query against InfluxDB with a `select count(*)` statement and return the count only.
+     *
+     * @throws IOException if there is an error on the InfluxDB server or communication error
+     */
+    public int getCounter(Query query) throws IOException {
+        QueryResult.Series series = queryForSingleSeries(query);
+        if (log.isDebugEnabled()) {
+            log.debug("SQL: {} result: {}", query.getCommand(), series);
+        }
+        if (Objects.isNull(series)) {
+            return 0;
+        }
+        return ((Number) series.getValues().get(0).get(1)).intValue();
+    }
+
+    /**
      * Data management, to drop a time-series by measurement and time-series name specified. If an exception isn't
      * thrown, it means execution success. Notice, drop series don't support to drop series by range
      *
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxModelConstants.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java
similarity index 62%
rename from oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxModelConstants.java
rename to oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java
index 18dd47d..33bc7db 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxModelConstants.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java
@@ -18,9 +18,31 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.influxdb;
 
-public interface InfluxModelConstants {
-    /**
-     * Override column because the 'duration' is the identifier of InfluxDB.
-     */
-    String DURATION = "dur";
+public interface InfluxConstants {
+    String ID_COLUMN = "id";
+
+    String NAME = "\"name\"";
+
+    String ALL_FIELDS = "*::field";
+
+    String SORT_DES = "top";
+
+    String SORT_ASC = "bottom";
+
+    String DURATION = "\"" + "duration" + "\"";
+
+    interface TagName {
+
+        String ID_COLUMN = "_id";
+
+        String NAME = "_name";
+
+        String ENTITY_ID = "_entity_id";
+
+        String TIME_BUCKET = "_time_bucket";
+
+        String NODE_TYPE = "_node_type";
+
+        String SERVICE_ID = "_service_id";
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java
index e501b4f..ae67748 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
@@ -46,10 +47,10 @@ import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.HistoryDele
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.InfluxStorageDAO;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.AggregationQuery;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.AlarmQuery;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.InfluxMetadataQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.InfluxNetworkAddressAlias;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.LogQuery;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.MetadataQuery;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.MetricsQuery;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.NetworkAddressAliasDAO;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileTaskLogQuery;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileTaskQuery;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileThreadSnapshotQuery;
@@ -60,7 +61,7 @@ import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.TraceQuery
 @Slf4j
 public class InfluxStorageProvider extends ModuleProvider {
     private InfluxStorageConfig config;
-    private InfluxClient influxClient;
+    private InfluxClient client;
 
     public InfluxStorageProvider() {
         config = new InfluxStorageConfig();
@@ -83,35 +84,42 @@ public class InfluxStorageProvider extends ModuleProvider {
 
     @Override
     public void prepare() throws ServiceNotProvidedException {
-        influxClient = new InfluxClient(config);
+        client = new InfluxClient(config);
 
-        this.registerServiceImplementation(IBatchDAO.class, new BatchDAO(influxClient));
-        this.registerServiceImplementation(StorageDAO.class, new InfluxStorageDAO(influxClient));
+        this.registerServiceImplementation(IBatchDAO.class, new BatchDAO(client));
+        this.registerServiceImplementation(StorageDAO.class, new InfluxStorageDAO(client));
 
-        this.registerServiceImplementation(INetworkAddressAliasDAO.class, new InfluxNetworkAddressAlias(influxClient));
-        this.registerServiceImplementation(IMetadataQueryDAO.class, new InfluxMetadataQueryDAO(influxClient));
+        this.registerServiceImplementation(INetworkAddressAliasDAO.class, new NetworkAddressAliasDAO(client));
+        this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQuery(client));
 
-        this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQuery(influxClient));
-        this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQuery(influxClient));
-        this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQuery(influxClient));
-        this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQuery(influxClient));
-        this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQuery(influxClient));
-        this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQuery(influxClient));
-        this.registerServiceImplementation(ILogQueryDAO.class, new LogQuery(influxClient));
+        this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQuery(client));
+        this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQuery(client));
+        this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQuery(client));
+        this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQuery(client));
+        this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQuery(client));
+        this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQuery(client));
+        this.registerServiceImplementation(ILogQueryDAO.class, new LogQuery(client));
 
-        this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQuery(influxClient));
+        this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQuery(client));
         this.registerServiceImplementation(
-            IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQuery(influxClient));
+            IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQuery(client));
         this.registerServiceImplementation(
-            IProfileTaskLogQueryDAO.class, new ProfileTaskLogQuery(influxClient, config.getFetchTaskLogMaxSize()));
+            IProfileTaskLogQueryDAO.class, new ProfileTaskLogQuery(client, config.getFetchTaskLogMaxSize()));
 
         this.registerServiceImplementation(
-            IHistoryDeleteDAO.class, new HistoryDeleteDAO(influxClient));
+            IHistoryDeleteDAO.class, new HistoryDeleteDAO(client));
     }
 
     @Override
     public void start() throws ServiceNotProvidedException, ModuleStartException {
-        influxClient.connect();
+        client.connect();
+
+        InfluxTableInstaller installer = new InfluxTableInstaller(getManager());
+        try {
+            installer.install(client);
+        } catch (StorageException e) {
+            throw new ModuleStartException(e.getMessage(), e);
+        }
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxNetworkAddressAlias.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxTableInstaller.java
similarity index 52%
rename from oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxNetworkAddressAlias.java
rename to oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxTableInstaller.java
index 90b47e2..585df52 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxNetworkAddressAlias.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxTableInstaller.java
@@ -16,22 +16,28 @@
  *
  */
 
-package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
+package org.apache.skywalking.oap.server.storage.plugin.influxdb;
 
-import java.util.List;
-import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.core.storage.StorageException;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
-public class InfluxNetworkAddressAlias implements INetworkAddressAliasDAO {
-    private InfluxClient client;
+public class InfluxTableInstaller extends ModelInstaller {
 
-    public InfluxNetworkAddressAlias(final InfluxClient client) {
-        this.client = client;
+    public InfluxTableInstaller(ModuleManager moduleManager) {
+        super(moduleManager);
     }
 
     @Override
-    public List<NetworkAddressAlias> loadLastUpdate(final long timeBucket) {
-        return null;
+    protected boolean isExists(final Client client, final Model model) throws StorageException {
+        TableMetaInfo.addModel(model);
+        return true;
+    }
+
+    @Override
+    protected void createTable(final Client client, final Model model) throws StorageException {
+        // Automatically create table
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java
index 4cc9e80..3d73663 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java
@@ -18,18 +18,80 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.influxdb;
 
+import com.google.common.collect.Maps;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.model.ColumnName;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
 
+@Getter
+@Builder
+@AllArgsConstructor
 public class TableMetaInfo {
-    private static Map<String, Model> TABLES = new HashMap<>();
+    private static final Map<String, TableMetaInfo> TABLES = new HashMap<>();
+
+    private Map<String, String> storageAndColumnMap;
+    private Map<String, String> storageAndTagMap;
+    private Model model;
 
     public static void addModel(Model model) {
-        TABLES.put(model.getName(), model);
+        final List<ModelColumn> columns = model.getColumns();
+        final Map<String, String> storageAndTagMap = Maps.newHashMap();
+        final Map<String, String> storageAndColumnMap = Maps.newHashMap();
+        columns.forEach(column -> {
+            ColumnName columnName = column.getColumnName();
+            storageAndColumnMap.put(columnName.getStorageName(), columnName.getName());
+        });
+
+        if (model.getName().endsWith("_traffic")) {
+            // instance_traffic name, service_id
+            // endpoint_traffic name, service_id
+            storageAndTagMap.put(InstanceTraffic.NAME, InfluxConstants.TagName.NAME);
+            if (InstanceTraffic.INDEX_NAME.equals(model.getName())
+                || EndpointTraffic.INDEX_NAME.equals(model.getName())) {
+                storageAndTagMap.put(EndpointTraffic.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID);
+            } else {
+                // service_traffic  name, node_type
+                storageAndTagMap.put(ServiceTraffic.NODE_TYPE, InfluxConstants.TagName.NODE_TYPE);
+            }
+        } else {
+
+            // Specifies ENTITY_ID, TIME_BUCKET, NODE_TYPE, SERVICE_ID as tag
+            if (storageAndColumnMap.containsKey(Metrics.ENTITY_ID)) {
+                storageAndTagMap.put(Metrics.ENTITY_ID, InfluxConstants.TagName.ENTITY_ID);
+            }
+            if (storageAndColumnMap.containsKey(Record.TIME_BUCKET)) {
+                storageAndTagMap.put(Record.TIME_BUCKET, InfluxConstants.TagName.TIME_BUCKET);
+            }
+            if (storageAndColumnMap.containsKey(ServiceTraffic.NODE_TYPE)) {
+                storageAndTagMap.put(ServiceTraffic.NODE_TYPE, InfluxConstants.TagName.NODE_TYPE);
+            }
+            if (storageAndColumnMap.containsKey(SegmentRecord.SERVICE_ID)) {
+                storageAndTagMap.put(SegmentRecord.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID);
+            }
+        }
+
+        TableMetaInfo info = TableMetaInfo.builder()
+                                          .model(model)
+                                          .storageAndTagMap(storageAndTagMap)
+                                          .storageAndColumnMap(storageAndColumnMap)
+                                          .build();
+        TABLES.put(model.getName(), info);
     }
 
-    public static Model get(String moduleName) {
+    public static TableMetaInfo get(String moduleName) {
         return TABLES.get(moduleName);
     }
+
 }
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java
index eaa0f4d..f984372 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java
@@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.storage.plugin.influxdb.base;
 import com.google.common.collect.Maps;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
 import org.apache.skywalking.oap.server.core.storage.StorageData;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -29,15 +28,13 @@ import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
 import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
 import org.influxdb.dto.Point;
 
 /**
  * InfluxDB Point wrapper.
  */
 public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
-    public static final String ID = "id";
-
     private Point.Builder builder;
     private Map<String, Object> fields = Maps.newHashMap();
 
@@ -57,9 +54,8 @@ public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
             }
         }
         builder = Point.measurement(model.getName())
-                       .addField(ID, storageData.id())
-                       .fields(fields)
-                       .tag(InfluxClient.TAG_TIME_BUCKET, String.valueOf(fields.get(Metrics.TIME_BUCKET)));
+                       .addField(InfluxConstants.ID_COLUMN, storageData.id())
+                       .fields(fields);
     }
 
     public InfluxInsertRequest time(long time, TimeUnit unit) {
@@ -68,9 +64,7 @@ public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
     }
 
     public InfluxInsertRequest addFieldAsTag(String fieldName, String tagName) {
-        if (fields.containsKey(fieldName)) {
-            builder.tag(tagName, String.valueOf(fields.get(fieldName)));
-        }
+        builder.tag(tagName, String.valueOf(fields.get(fieldName)));
         return this;
     }
 
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java
index eb3008f..86f5c8c 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java
@@ -26,30 +26,27 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
 import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
 import org.influxdb.dto.QueryResult;
 import org.influxdb.querybuilder.SelectQueryImpl;
 import org.influxdb.querybuilder.WhereQueryImpl;
 
+import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ALL_FIELDS;
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
 
+@Slf4j
 public class MetricsDAO implements IMetricsDAO {
-    public static final String TAG_ENTITY_ID = "_entity_id";
-    public static final String TAG_ENDPOINT_OWNER_SERVICE = "_service_id";
-    public static final String TAG_ENDPOINT_NAME = "_endpoint_name";
 
     private final StorageBuilder<Metrics> storageBuilder;
     private final InfluxClient client;
@@ -62,10 +59,13 @@ public class MetricsDAO implements IMetricsDAO {
     @Override
     public List<Metrics> multiGet(Model model, List<String> ids) throws IOException {
         WhereQueryImpl<SelectQueryImpl> query = select()
-            .regex("*::field")
+            .raw(ALL_FIELDS)
             .from(client.getDatabase(), model.getName())
             .where(contains("id", Joiner.on("|").join(ids)));
         QueryResult.Series series = client.queryForSingleSeries(query);
+        if (log.isDebugEnabled()) {
+            log.debug("SQL: {} result: {}", query.getCommand(), series);
+        }
 
         if (series == null) {
             return Collections.emptyList();
@@ -73,10 +73,9 @@ public class MetricsDAO implements IMetricsDAO {
 
         final List<Metrics> metrics = Lists.newArrayList();
         List<String> columns = series.getColumns();
-        Map<String, String> storageAndColumnNames = Maps.newHashMap();
-        for (ModelColumn column : model.getColumns()) {
-            storageAndColumnNames.put(column.getColumnName().getStorageName(), column.getColumnName().getName());
-        }
+
+        TableMetaInfo metaInfo = TableMetaInfo.get(model.getName());
+        Map<String, String> storageAndColumnMap = metaInfo.getStorageAndColumnMap();
 
         series.getValues().forEach(values -> {
             Map<String, Object> data = Maps.newHashMap();
@@ -87,7 +86,7 @@ public class MetricsDAO implements IMetricsDAO {
                     value = ((StorageDataComplexObject) value).toStorageData();
                 }
 
-                data.put(storageAndColumnNames.get(columns.get(i)), value);
+                data.put(storageAndColumnMap.get(columns.get(i)), value);
             }
             metrics.add(storageBuilder.map2Data(data));
 
@@ -99,16 +98,15 @@ public class MetricsDAO implements IMetricsDAO {
     @Override
     public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
         final long timestamp = TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling());
-        if (metrics instanceof EndpointTraffic || metrics instanceof ServiceTraffic || metrics instanceof InstanceTraffic) {
-            return new InfluxInsertRequest(model, metrics, storageBuilder)
-                .time(timestamp, TimeUnit.MILLISECONDS)
-                .addFieldAsTag(EndpointTraffic.SERVICE_ID, TAG_ENDPOINT_OWNER_SERVICE)
-                .addFieldAsTag(EndpointTraffic.NAME, TAG_ENDPOINT_NAME);
-        } else {
-            return new InfluxInsertRequest(model, metrics, storageBuilder)
-                .time(timestamp, TimeUnit.MILLISECONDS)
-                .addFieldAsTag(Metrics.ENTITY_ID, TAG_ENTITY_ID);
-        }
+        TableMetaInfo tableMetaInfo = TableMetaInfo.get(model.getName());
+
+        final InfluxInsertRequest request = new InfluxInsertRequest(model, metrics, storageBuilder)
+            .time(timestamp, TimeUnit.MILLISECONDS);
+
+        tableMetaInfo.getStorageAndTagMap().forEach((field, tag) -> {
+            request.addFieldAsTag(field, tag);
+        });
+        return request;
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/NoneStreamDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/NoneStreamDAO.java
index ae00288..3d89e41 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/NoneStreamDAO.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/NoneStreamDAO.java
@@ -23,15 +23,13 @@ import java.util.concurrent.TimeUnit;
 import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
 import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
-import org.influxdb.dto.Point;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
 
 public class NoneStreamDAO implements INoneStreamDAO {
-    public static final String TAG_SERVICE_ID = "_service_id";
     private static final int PADDING_SIZE = 1_000_000;
     private static final AtomicRangeInteger SUFFIX = new AtomicRangeInteger(0, PADDING_SIZE);
 
@@ -45,13 +43,14 @@ public class NoneStreamDAO implements INoneStreamDAO {
 
     @Override
     public void insert(final Model model, final NoneStream noneStream) throws IOException {
-        final long timestamp = TimeBucket.getTimestamp(
-            noneStream.getTimeBucket(), model.getDownsampling()) * PADDING_SIZE + SUFFIX.getAndIncrement();
-
-        Point point = new InfluxInsertRequest(model, noneStream, storageBuilder)
-            .time(timestamp, TimeUnit.NANOSECONDS)
-            .addFieldAsTag(ProfileTaskRecord.SERVICE_ID, TAG_SERVICE_ID).getPoint();
-
-        client.write(point);
+        final long timestamp = TimeBucket.getTimestamp(noneStream.getTimeBucket(), model.getDownsampling())
+            * PADDING_SIZE + SUFFIX.getAndIncrement();
+
+        final InfluxInsertRequest request = new InfluxInsertRequest(model, noneStream, storageBuilder)
+            .time(timestamp, TimeUnit.NANOSECONDS);
+        TableMetaInfo.get(model.getName()).getStorageAndTagMap().forEach((field, tag) -> {
+            request.addFieldAsTag(field, tag);
+        });
+        client.write(request.getPoint());
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java
index fae2e41..63739c1 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java
@@ -22,16 +22,15 @@ import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
 
 public class RecordDAO implements IRecordDAO {
-    public static final String TAG_SERVICE_ID = "_service_id";
     private static final int PADDING_SIZE = 1_000_000;
     private static final AtomicRangeInteger SUFFIX = new AtomicRangeInteger(0, PADDING_SIZE);
 
@@ -45,11 +44,14 @@ public class RecordDAO implements IRecordDAO {
 
     @Override
     public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
-        final long timestamp = TimeBucket.getTimestamp(
-            record.getTimeBucket(), model.getDownsampling()) * PADDING_SIZE + SUFFIX.getAndIncrement();
+        final long timestamp = TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling())
+            * PADDING_SIZE + SUFFIX.getAndIncrement();
 
-        return new InfluxInsertRequest(model, record, storageBuilder)
-            .time(timestamp, TimeUnit.NANOSECONDS)
-            .addFieldAsTag(SegmentRecord.SERVICE_ID, TAG_SERVICE_ID);
+        final InfluxInsertRequest request = new InfluxInsertRequest(model, record, storageBuilder)
+            .time(timestamp, TimeUnit.NANOSECONDS);
+        TableMetaInfo.get(model.getName()).getStorageAndTagMap().forEach((field, tag) -> {
+            request.addFieldAsTag(field, tag);
+        });
+        return request;
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/AggregationQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/AggregationQuery.java
index 2ab6b21..48cd1ef 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/AggregationQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/AggregationQuery.java
@@ -25,13 +25,11 @@ import java.util.Comparator;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
 import org.apache.skywalking.oap.server.core.query.entity.Order;
 import org.apache.skywalking.oap.server.core.query.entity.TopNEntity;
 import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.MetricsDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
 import org.influxdb.dto.QueryResult;
 import org.influxdb.querybuilder.SelectQueryImpl;
 import org.influxdb.querybuilder.SelectSubQueryImpl;
@@ -68,7 +66,7 @@ public class AggregationQuery implements IAggregationQueryDAO {
                                                    long startTB, long endTB, Order order) throws IOException {
         return getTopNEntity(
             downsampling, indName,
-            subQuery(InstanceTraffic.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN
+            subQuery(InfluxConstants.TagName.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN
         );
     }
 
@@ -84,7 +82,7 @@ public class AggregationQuery implements IAggregationQueryDAO {
                                             long startTB, long endTB, Order order) throws IOException {
         return getTopNEntity(
             downsampling, indName,
-            subQuery(EndpointTraffic.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN
+            subQuery(InfluxConstants.TagName.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN
         );
     }
 
@@ -95,14 +93,14 @@ public class AggregationQuery implements IAggregationQueryDAO {
                                            int topN) throws IOException {
         // Have to re-sort here. Because the function, top()/bottom(), get the result ordered by the `time`.
         Comparator<TopNEntity> comparator = DESCENDING;
-        String functionName = "top";
+        String functionName = InfluxConstants.SORT_DES;
         if (order == Order.ASC) {
-            functionName = "bottom";
+            functionName = InfluxConstants.SORT_ASC;
             comparator = ASCENDING;
         }
 
         SelectQueryImpl query = select().function(functionName, "mean", topN).as("value")
-                                        .column(MetricsDAO.TAG_ENTITY_ID)
+                                        .column(InfluxConstants.TagName.ENTITY_ID)
                                         .from(client.getDatabase(), measurement);
         query.setSubQuery(subQuery);
 
@@ -135,7 +133,7 @@ public class AggregationQuery implements IAggregationQueryDAO {
                        .and(eq(serviceColumnName, serviceId))
                        .and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB)))
                        .and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB)))
-                       .groupBy(MetricsDAO.TAG_ENTITY_ID);
+                       .groupBy(InfluxConstants.TagName.ENTITY_ID);
     }
 
     private SelectSubQueryImpl<SelectQueryImpl> subQuery(String name, String columnName, long startTB, long endTB) {
@@ -143,7 +141,7 @@ public class AggregationQuery implements IAggregationQueryDAO {
                        .where()
                        .and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB)))
                        .and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB)))
-                       .groupBy(MetricsDAO.TAG_ENTITY_ID);
+                       .groupBy(InfluxConstants.TagName.ENTITY_ID);
     }
 
     private static final Comparator<TopNEntity> ASCENDING = Comparator.comparingLong(TopNEntity::getValue);
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxMetadataQueryDAO.java
deleted file mode 100644
index b300f85..0000000
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxMetadataQueryDAO.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
-
-import com.google.common.base.Strings;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.query.entity.Database;
-import org.apache.skywalking.oap.server.core.query.entity.Endpoint;
-import org.apache.skywalking.oap.server.core.query.entity.Service;
-import org.apache.skywalking.oap.server.core.query.entity.ServiceInstance;
-import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.MetricsDAO;
-import org.influxdb.dto.Query;
-import org.influxdb.dto.QueryResult;
-import org.influxdb.querybuilder.SelectQueryImpl;
-import org.influxdb.querybuilder.WhereQueryImpl;
-
-import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
-import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
-import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
-
-public class InfluxMetadataQueryDAO implements IMetadataQueryDAO {
-    private InfluxClient client;
-    // 'name' is InfluxDB keyword, so escapes it
-    private static final String ENDPOINT_NAME = '\"' + EndpointTraffic.NAME + '\"';
-
-    public InfluxMetadataQueryDAO(final InfluxClient client) {
-        this.client = client;
-    }
-
-    @Override
-    public int numOfService(final long startTimestamp, final long endTimestamp) throws IOException {
-        return 0;
-    }
-
-    @Override
-    public int numOfEndpoint() throws IOException {
-        final SelectQueryImpl selectQuery = select()
-            .count(EndpointTraffic.ENTITY_ID)
-            .from(client.getDatabase(), EndpointTraffic.INDEX_NAME);
-
-        Query query = new Query(selectQuery.getCommand());
-
-        final QueryResult.Series series = client.queryForSingleSeries(query);
-        if (series == null) {
-            return 0;
-        }
-
-        return ((Number) series.getValues().get(0).get(1)).intValue();
-    }
-
-    @Override
-    public int numOfConjectural(final int nodeTypeValue) throws IOException {
-        return 0;
-    }
-
-    @Override
-    public List<Service> getAllServices(final long startTimestamp, final long endTimestamp) throws IOException {
-        return null;
-    }
-
-    @Override
-    public List<Service> getAllBrowserServices(final long startTimestamp, final long endTimestamp) throws IOException {
-        return null;
-    }
-
-    @Override
-    public List<Database> getAllDatabases() throws IOException {
-        return null;
-    }
-
-    @Override
-    public List<Service> searchServices(final long startTimestamp,
-                                        final long endTimestamp,
-                                        final String keyword) throws IOException {
-        return null;
-    }
-
-    @Override
-    public Service searchService(final String serviceCode) throws IOException {
-        return null;
-    }
-
-    @Override
-    public List<Endpoint> searchEndpoint(final String keyword,
-                                         final String serviceId,
-                                         final int limit) throws IOException {
-        WhereQueryImpl<SelectQueryImpl> endpointQuery = select()
-            .column(EndpointTraffic.SERVICE_ID)
-            .column(ENDPOINT_NAME)
-            .from(client.getDatabase(), EndpointTraffic.INDEX_NAME)
-            .where();
-        endpointQuery.where(eq(MetricsDAO.TAG_ENDPOINT_OWNER_SERVICE, String.valueOf(serviceId)));
-        if (!Strings.isNullOrEmpty(keyword)) {
-            endpointQuery.where(contains(MetricsDAO.TAG_ENDPOINT_NAME, keyword.replaceAll("/", "\\\\/")));
-        }
-        endpointQuery.limit(limit);
-
-        Query query = new Query(endpointQuery.getCommand());
-
-        final QueryResult.Series series = client.queryForSingleSeries(query);
-
-        List<Endpoint> list = new ArrayList<>(limit);
-        if (series != null) {
-            series.getValues().forEach(values -> {
-                EndpointTraffic endpointTraffic = new EndpointTraffic();
-                endpointTraffic.setServiceId((String) values.get(1));
-                endpointTraffic.setName((String) values.get(2));
-
-                Endpoint endpoint = new Endpoint();
-                endpoint.setId(IDManager.EndpointID.buildId(endpointTraffic.getServiceId(), endpointTraffic.getName()));
-                endpoint.setName(endpointTraffic.getName());
-                list.add(endpoint);
-            });
-        }
-        return list;
-    }
-
-    @Override
-    public List<ServiceInstance> getServiceInstances(final long startTimestamp,
-                                                     final long endTimestamp,
-                                                     final String serviceId) throws IOException {
-        return null;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/LogQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/LogQuery.java
index fe69ac0..6b6019d 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/LogQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/LogQuery.java
@@ -32,8 +32,9 @@ import org.apache.skywalking.oap.server.core.query.entity.LogState;
 import org.apache.skywalking.oap.server.core.query.entity.Logs;
 import org.apache.skywalking.oap.server.core.query.entity.Pagination;
 import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.RecordDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
 import org.elasticsearch.common.Strings;
 import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
@@ -51,6 +52,7 @@ import static org.apache.skywalking.oap.server.core.analysis.manual.log.Abstract
 import static org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord.STATUS_CODE;
 import static org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord.TIMESTAMP;
 import static org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord.TRACE_ID;
+import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ALL_FIELDS;
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lte;
@@ -68,11 +70,11 @@ public class LogQuery implements ILogQueryDAO {
     public Logs queryLogs(String metricName, int serviceId, int serviceInstanceId, String endpointId, String traceId,
                           LogState state, String stateCode, Pagination paging, int from, int limit,
                           long startTB, long endTB) throws IOException {
-        WhereQueryImpl<SelectQueryImpl> recallQuery = select().regex("*::field")
+        WhereQueryImpl<SelectQueryImpl> recallQuery = select().raw(ALL_FIELDS)
                                                               .from(client.getDatabase(), metricName)
                                                               .where();
         if (serviceId != Const.NONE) {
-            recallQuery.and(eq(RecordDAO.TAG_SERVICE_ID, String.valueOf(serviceId)));
+            recallQuery.and(eq(InfluxConstants.TagName.SERVICE_ID, String.valueOf(serviceId)));
         }
         if (serviceInstanceId != Const.NONE) {
             recallQuery.and(eq(SERVICE_INSTANCE_ID, serviceInstanceId));
@@ -131,8 +133,12 @@ public class LogQuery implements ILogQueryDAO {
                 Map<String, Object> data = Maps.newHashMap();
                 Log log = new Log();
 
-                for (int i = 0; i < columns.size(); i++) {
-                    data.put(columns.get(i), values.get(i));
+                for (int i = 1; i < columns.size(); i++) {
+                    Object value = values.get(i);
+                    if (value instanceof StorageDataComplexObject) {
+                        value = ((StorageDataComplexObject) value).toStorageData();
+                    }
+                    data.put(columns.get(i), value);
                 }
                 log.setContent((String) data.get(CONTENT));
                 log.setContentType(ContentType.instanceOf((int) data.get(CONTENT_TYPE)));
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java
new file mode 100644
index 0000000..bd04197
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.NodeType;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
+import org.apache.skywalking.oap.server.core.query.entity.Attribute;
+import org.apache.skywalking.oap.server.core.query.entity.Database;
+import org.apache.skywalking.oap.server.core.query.entity.Endpoint;
+import org.apache.skywalking.oap.server.core.query.entity.Language;
+import org.apache.skywalking.oap.server.core.query.entity.LanguageTrans;
+import org.apache.skywalking.oap.server.core.query.entity.Service;
+import org.apache.skywalking.oap.server.core.query.entity.ServiceInstance;
+import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import org.influxdb.querybuilder.SelectQueryImpl;
+import org.influxdb.querybuilder.SelectSubQueryImpl;
+import org.influxdb.querybuilder.WhereQueryImpl;
+import org.influxdb.querybuilder.WhereSubQueryImpl;
+
+import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ID_COLUMN;
+import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.NAME;
+import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.TagName;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
+
+@Slf4j
+public class MetadataQuery implements IMetadataQueryDAO {
+    private static final Gson GSON = new Gson();
+    private final InfluxClient client;
+
+    public MetadataQuery(final InfluxClient client) {
+        this.client = client;
+    }
+
+    @Override
+    public int numOfService(final long startTimestamp, final long endTimestamp) throws IOException {
+        WhereQueryImpl query = select().raw("count(distinct " + ID_COLUMN + ")")
+                                       .from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
+                                       .where()
+                                       .and(
+                                           eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())
+                                           ));
+        return client.getCounter(query);
+    }
+
+    @Override
+    public int numOfEndpoint() throws IOException {
+        SelectQueryImpl query = select()
+            .raw("count(distinct " + ID_COLUMN + ")")
+            .from(client.getDatabase(), EndpointTraffic.INDEX_NAME);
+        return client.getCounter(query);
+    }
+
+    @Override
+    public int numOfConjectural(final int nodeTypeValue) throws IOException {
+        WhereQueryImpl<SelectQueryImpl> query = select().raw("count(distinct " + ID_COLUMN + ")")
+                                                        .from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
+                                                        .where(eq(
+                                                            InfluxConstants.TagName.NODE_TYPE,
+                                                            String.valueOf(nodeTypeValue)
+                                                        ));
+        return client.getCounter(query);
+    }
+
+    @Override
+    public List<Service> getAllServices(final long startTimestamp, final long endTimestamp) throws IOException {
+        SelectSubQueryImpl<SelectQueryImpl> subQuery = select()
+            .fromSubQuery(client.getDatabase())
+            .column(ID_COLUMN).column(NAME)
+            .from(ServiceTraffic.INDEX_NAME)
+            .where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())))
+            .groupBy(TagName.NAME, TagName.NODE_TYPE);
+        SelectQueryImpl query = select(ID_COLUMN, NAME).from(client.getDatabase());
+        query.setSubQuery(subQuery);
+        return buildServices(query);
+    }
+
+    @Override
+    public List<Service> getAllBrowserServices(long startTimestamp, long endTimestamp) throws IOException {
+        WhereQueryImpl<SelectQueryImpl> query = select(ID_COLUMN, NAME)
+            .from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
+            .where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())));
+        return buildServices(query);
+    }
+
+    @Override
+    public List<Database> getAllDatabases() throws IOException {
+        SelectSubQueryImpl<SelectQueryImpl> subQuery = select()
+            .fromSubQuery(client.getDatabase())
+            .column(ID_COLUMN).column(NAME)
+            .from(ServiceTraffic.INDEX_NAME)
+            .where(eq(InfluxConstants.TagName.NODE_TYPE, NodeType.Database.value()))
+            .groupBy(TagName.NAME, TagName.NODE_TYPE);
+        SelectQueryImpl query = select(ID_COLUMN, NAME).from(client.getDatabase());
+        query.setSubQuery(subQuery);
+        QueryResult.Series series = client.queryForSingleSeries(query);
+        if (log.isDebugEnabled()) {
+            log.debug("SQL: {} result: {}", query.getCommand(), series);
+        }
+
+        List<Database> databases = Lists.newArrayList();
+        if (Objects.nonNull(series)) {
+            for (List<Object> values : series.getValues()) {
+                Database database = new Database();
+                database.setId((String) values.get(1));
+                database.setName((String) values.get(2));
+                databases.add(database);
+            }
+        }
+        return databases;
+    }
+
+    @Override
+    public List<Service> searchServices(long startTimestamp, long endTimestamp, String keyword) throws IOException {
+        WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select()
+            .fromSubQuery(client.getDatabase())
+            .column(ID_COLUMN)
+            .column(NAME)
+            .from(ServiceTraffic.INDEX_NAME)
+            .where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())));
+        if (!Strings.isNullOrEmpty(keyword)) {
+            subQuery.and(contains(ServiceTraffic.NAME, keyword));
+        }
+        subQuery.groupBy(TagName.NAME, TagName.NODE_TYPE);
+
+        SelectQueryImpl query = select(ID_COLUMN, NAME).from(client.getDatabase());
+        query.setSubQuery(subQuery);
+        return buildServices(query);
+    }
+
+    @Override
+    public Service searchService(String serviceCode) throws IOException {
+        WhereQueryImpl<SelectQueryImpl> query = select(ID_COLUMN, NAME)
+            .from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
+            .where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())));
+        query.and(eq(ServiceTraffic.NODE_TYPE, serviceCode));
+        return buildServices(query).get(0);
+    }
+
+    @Override
+    public List<Endpoint> searchEndpoint(final String keyword,
+                                         final String serviceId,
+                                         final int limit) throws IOException {
+        WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select()
+            .fromSubQuery(client.getDatabase())
+            .column(ID_COLUMN)
+            .column(NAME)
+            .from(EndpointTraffic.INDEX_NAME)
+            .where(eq(InfluxConstants.TagName.SERVICE_ID, String.valueOf(serviceId)));
+        if (!Strings.isNullOrEmpty(keyword)) {
+            subQuery.where(contains(EndpointTraffic.NAME, keyword.replaceAll("/", "\\\\/")));
+        }
+        subQuery.groupBy(TagName.NAME, TagName.SERVICE_ID);
+        SelectQueryImpl query = select(ID_COLUMN, NAME)
+            .from(client.getDatabase());
+        query.setSubQuery(subQuery);
+        query.limit(limit);
+
+        final QueryResult.Series series = client.queryForSingleSeries(query);
+        if (log.isDebugEnabled()) {
+            log.debug("SQL: {} result: {}", query.getCommand(), series);
+        }
+
+        List<Endpoint> list = new ArrayList<>(limit);
+        if (series != null) {
+            series.getValues().forEach(values -> {
+                Endpoint endpoint = new Endpoint();
+                endpoint.setId((String) values.get(1));
+                endpoint.setName((String) values.get(2));
+                list.add(endpoint);
+            });
+        }
+        return list;
+    }
+
+    @Override
+    public List<ServiceInstance> getServiceInstances(final long startTimestamp,
+                                                     final long endTimestamp,
+                                                     final String serviceId) throws IOException {
+        final long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp);
+
+        SelectSubQueryImpl<SelectQueryImpl> subQuery = select()
+            .fromSubQuery(client.getDatabase())
+            .column(ID_COLUMN).column(NAME).column(InstanceTraffic.PROPERTIES)
+            .from(InstanceTraffic.INDEX_NAME)
+            .where()
+            .and(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, minuteTimeBucket))
+            .and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId))
+            .groupBy(TagName.NAME, TagName.SERVICE_ID);
+
+        SelectQueryImpl query = select().column(ID_COLUMN)
+                                        .column(NAME)
+                                        .column(InstanceTraffic.PROPERTIES)
+                                        .from(client.getDatabase(), InstanceTraffic.INDEX_NAME);
+        query.setSubQuery(subQuery);
+
+        QueryResult.Series series = client.queryForSingleSeries(query);
+        if (log.isDebugEnabled()) {
+            log.debug("SQL: {} result: {}", query.getCommand(), series);
+        }
+
+        if (Objects.isNull(series)) {
+            return Collections.EMPTY_LIST;
+        }
+
+        List<List<Object>> result = series.getValues();
+        List<ServiceInstance> instances = Lists.newArrayList();
+        for (List<Object> values : result) {
+            ServiceInstance serviceInstance = new ServiceInstance();
+
+            serviceInstance.setId((String) values.get(1));
+            serviceInstance.setName((String) values.get(2));
+            serviceInstance.setInstanceUUID(serviceInstance.getId());
+
+            String propertiesString = (String) values.get(3);
+            if (!Strings.isNullOrEmpty(propertiesString)) {
+                JsonObject properties = GSON.fromJson(propertiesString, JsonObject.class);
+                for (Map.Entry<String, JsonElement> property : properties.entrySet()) {
+                    String key = property.getKey();
+                    String value = property.getValue().getAsString();
+                    if (key.equals(InstanceTraffic.PropertyUtil.LANGUAGE)) {
+                        serviceInstance.setLanguage(LanguageTrans.INSTANCE.value(value));
+                    } else {
+                        serviceInstance.getAttributes().add(new Attribute(key, value));
+                    }
+
+                }
+            } else {
+                serviceInstance.setLanguage(Language.UNKNOWN);
+            }
+            instances.add(serviceInstance);
+        }
+        return instances;
+    }
+
+    private List<Service> buildServices(Query query) throws IOException {
+        QueryResult.Series series = client.queryForSingleSeries(query);
+        if (log.isDebugEnabled()) {
+            log.debug("SQL: {} result: {}", query.getCommand(), series);
+        }
+
+        ArrayList<Service> services = Lists.newArrayList();
+        if (Objects.nonNull(series)) {
+            for (List<Object> values : series.getValues()) {
+                Service service = new Service();
+                service.setId((String) values.get(1));
+                service.setName((String) values.get(2));
+                services.add(service);
+            }
+        }
+        return services;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java
index 09b2107..a28f1c6 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java
@@ -38,14 +38,16 @@ import org.apache.skywalking.oap.server.core.query.sql.KeyValues;
 import org.apache.skywalking.oap.server.core.query.sql.Where;
 import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
 import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.MetricsDAO;
 import org.influxdb.dto.QueryResult;
 import org.influxdb.querybuilder.SelectQueryImpl;
 import org.influxdb.querybuilder.SelectionQueryImpl;
 import org.influxdb.querybuilder.WhereQueryImpl;
 
+import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ID_COLUMN;
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
@@ -74,7 +76,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
         WhereQueryImpl<SelectQueryImpl> queryWhereQuery = query.from(client.getDatabase(), measurement).where();
 
         Map<String, Class<?>> columnTypes = Maps.newHashMap();
-        for (ModelColumn column : TableMetaInfo.get(measurement).getColumns()) {
+        for (ModelColumn column : TableMetaInfo.get(measurement).getModel().getColumns()) {
             columnTypes.put(column.getColumnName().getStorageName(), column.getType());
         }
 
@@ -84,6 +86,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
             StringBuilder clauseBuilder = new StringBuilder();
             for (KeyValues kv : whereKeyValues) {
                 final List<String> values = kv.getValues();
+                ids.addAll(values);
 
                 Class<?> type = columnTypes.get(kv.getKey());
                 if (values.size() == 1) {
@@ -93,16 +96,15 @@ public class MetricsQuery implements IMetricsQueryDAO {
                     }
                     clauseBuilder.append(kv.getKey()).append("=").append(value).append(" OR ");
                 } else {
-                    ids.addAll(values);
                     if (type == String.class) {
                         clauseBuilder.append(kv.getKey())
                                      .append(" =~ /")
                                      .append(Joiner.on("|").join(values))
                                      .append("/ OR ");
-                        continue;
-                    }
-                    for (String value : values) {
-                        clauseBuilder.append(kv.getKey()).append(" = '").append(value).append("' OR ");
+                    } else {
+                        for (String value : values) {
+                            clauseBuilder.append(kv.getKey()).append(" = '").append(value).append("' OR ");
+                        }
                     }
                 }
             }
@@ -111,17 +113,17 @@ public class MetricsQuery implements IMetricsQueryDAO {
         queryWhereQuery
             .and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB, downsampling)))
             .and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB, downsampling)))
-            .groupBy(MetricsDAO.TAG_ENTITY_ID);
+            .groupBy(InfluxConstants.TagName.ENTITY_ID);
 
         IntValues intValues = new IntValues();
         List<QueryResult.Series> seriesList = client.queryForSeries(queryWhereQuery);
         if (log.isDebugEnabled()) {
             log.debug("SQL: {} result set: {}", queryWhereQuery.getCommand(), seriesList);
         }
-        if (!(seriesList == null || seriesList.isEmpty())) {
+        if (CollectionUtils.isNotEmpty(seriesList)) {
             for (QueryResult.Series series : seriesList) {
                 KVInt kv = new KVInt();
-                kv.setId(series.getTags().get(MetricsDAO.TAG_ENTITY_ID));
+                kv.setId(series.getTags().get(InfluxConstants.TagName.ENTITY_ID));
                 Number value = (Number) series.getValues().get(0).get(1);
                 kv.setValue(value.longValue());
 
@@ -139,16 +141,16 @@ public class MetricsQuery implements IMetricsQueryDAO {
                                         String valueCName)
         throws IOException {
         WhereQueryImpl<SelectQueryImpl> query = select()
-            .column("id")
+            .column(ID_COLUMN)
             .column(valueCName)
             .from(client.getDatabase(), measurement)
             .where();
 
-        if (ids != null && !ids.isEmpty()) {
+        if (CollectionUtils.isNotEmpty(ids)) {
             if (ids.size() == 1) {
-                query.where(eq("id", ids.get(0)));
+                query.where(eq(ID_COLUMN, ids.get(0)));
             } else {
-                query.where(contains("id", Joiner.on("|").join(ids)));
+                query.where(contains(ID_COLUMN, Joiner.on("|").join(ids)));
             }
         }
         List<QueryResult.Series> seriesList = client.queryForSeries(query);
@@ -157,7 +159,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
         }
 
         IntValues intValues = new IntValues();
-        if (!(seriesList == null || seriesList.isEmpty())) {
+        if (CollectionUtils.isNotEmpty(seriesList)) {
             seriesList.get(0).getValues().forEach(values -> {
                 KVInt kv = new KVInt();
                 kv.setValue(((Number) values.get(2)).longValue());
@@ -197,7 +199,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
             .from(client.getDatabase(), measurement)
             .where();
 
-        if (ids != null && !ids.isEmpty()) {
+        if (CollectionUtils.isNotEmpty(ids)) {
             if (ids.size() == 1) {
                 query.where(eq("id", ids.get(0)));
             } else {
@@ -212,7 +214,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
         for (int i = 0; i < intValues.length; i++) {
             intValues[i] = new IntValues();
         }
-        if (series == null || series.isEmpty()) {
+        if (CollectionUtils.isEmpty(series)) {
             return intValues;
         }
         series.get(0).getValues().forEach(values -> {
@@ -253,9 +255,9 @@ public class MetricsQuery implements IMetricsQueryDAO {
             .column(ThermodynamicMetrics.STEP)
             .column(ThermodynamicMetrics.NUM_OF_STEPS)
             .column(ThermodynamicMetrics.DETAIL_GROUP)
-            .column("id")
+            .column(ID_COLUMN)
             .from(client.getDatabase(), measurement)
-            .where(contains("id", Joiner.on("|").join(ids)));
+            .where(contains(ID_COLUMN, Joiner.on("|").join(ids)));
         Map<String, List<Long>> thermodynamicValueMatrix = new HashMap<>();
 
         QueryResult.Series series = client.queryForSingleSeries(query);
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/NetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/NetworkAddressAliasDAO.java
new file mode 100644
index 0000000..ef60af8
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/NetworkAddressAliasDAO.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
+import org.influxdb.dto.QueryResult;
+import org.influxdb.querybuilder.SelectQueryImpl;
+import org.influxdb.querybuilder.WhereQueryImpl;
+
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
+
+@Slf4j
+public class NetworkAddressAliasDAO implements INetworkAddressAliasDAO {
+    private final NetworkAddressAlias.Builder builder = new NetworkAddressAlias.Builder();
+    private InfluxClient client;
+
+    public NetworkAddressAliasDAO(final InfluxClient client) {
+        this.client = client;
+    }
+
+    @Override
+    public List<NetworkAddressAlias> loadLastUpdate(final long timeBucket) {
+        List<NetworkAddressAlias> networkAddressAliases = new ArrayList<>();
+
+        WhereQueryImpl<SelectQueryImpl> query = select().raw(InfluxConstants.ALL_FIELDS)
+                                                        .from(client.getDatabase(), NetworkAddressAlias.INDEX_NAME)
+                                                        .where(gte(
+                                                            NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET,
+                                                            timeBucket
+                                                        ));
+        try {
+            QueryResult.Series series = client.queryForSingleSeries(query);
+            if (log.isDebugEnabled()) {
+                log.debug("SQL: {} result: {}", query.getCommand(), series);
+            }
+            if (Objects.isNull(series)) {
+                return networkAddressAliases;
+            }
+
+            List<List<Object>> result = series.getValues();
+            List<String> columns = series.getColumns();
+
+            Map<String, String> columnAndFieldMap = TableMetaInfo.get(NetworkAddressAlias.INDEX_NAME)
+                                                                 .getStorageAndColumnMap();
+            for (List<Object> values : result) {
+                Map<String, Object> map = Maps.newHashMap();
+                for (int i = 1; i < columns.size(); i++) {
+                    map.put(columnAndFieldMap.get(columns.get(i)), values.get(i));
+                }
+                networkAddressAliases.add(builder.map2Data(map));
+            }
+        } catch (IOException e) {
+            log.error(e.getMessage(), e);
+        }
+
+        return networkAddressAliases;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskLogQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskLogQuery.java
index cfac9db..59982d8 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskLogQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskLogQuery.java
@@ -19,17 +19,16 @@
 package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
 import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLog;
 import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLogOperationType;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
 import org.influxdb.dto.QueryResult;
 import org.influxdb.querybuilder.SelectQueryImpl;
 import org.influxdb.querybuilder.WhereQueryImpl;
@@ -49,8 +48,8 @@ public class ProfileTaskLogQuery implements IProfileTaskLogQueryDAO {
     @Override
     public List<ProfileTaskLog> getTaskLogList() throws IOException {
         WhereQueryImpl<SelectQueryImpl> query = select()
-            .function("top", ProfileTaskLogRecord.OPERATION_TIME, fetchTaskLogMaxSize)
-            .column("id")
+            .function(InfluxConstants.SORT_DES, ProfileTaskLogRecord.OPERATION_TIME, fetchTaskLogMaxSize)
+            .column(InfluxConstants.ID_COLUMN)
             .column(ProfileTaskLogRecord.TASK_ID)
             .column(ProfileTaskLogRecord.INSTANCE_ID)
             .column(ProfileTaskLogRecord.OPERATION_TIME)
@@ -65,26 +64,18 @@ public class ProfileTaskLogQuery implements IProfileTaskLogQueryDAO {
         if (series == null) {
             return Collections.emptyList();
         }
-        List<String> columns = series.getColumns();
-        Map<String, Integer> columnsMap = Maps.newHashMap();
-        for (int i = 0; i < columns.size(); i++) {
-            columnsMap.put(columns.get(i), i);
-        }
-
-        List<ProfileTaskLog> taskLogs = Lists.newArrayList();
+        final List<ProfileTaskLog> taskLogs = Lists.newArrayList();
         series.getValues().stream()
               // re-sort by self, because of the result order by time.
               .sorted((a, b) -> Long.compare(((Number) b.get(1)).longValue(), ((Number) a.get(1)).longValue()))
               .forEach(values -> {
                   taskLogs.add(ProfileTaskLog.builder()
-                                             .id((String) values.get(columnsMap.get("id")))
-                                             .taskId((String) values.get(columnsMap.get(ProfileTaskLogRecord.TASK_ID)))
-                                             .instanceId(
-                                                 (String) values.get(columnsMap.get(ProfileTaskLogRecord.INSTANCE_ID)))
-                                             .operationTime(
-                                                 (Long) values.get(columnsMap.get(ProfileTaskLogRecord.OPERATION_TIME)))
+                                             .id((String) values.get(2))
+                                             .taskId((String) values.get(3))
+                                             .instanceId((String) values.get(4))
+                                             .operationTime(((Number) values.get(5)).longValue())
                                              .operationType(ProfileTaskLogOperationType.parse(
-                                                 (int) values.get(columnsMap.get(ProfileTaskLogRecord.OPERATION_TYPE))))
+                                                 ((Number) values.get(6)).intValue()))
                                              .build());
               });
         return taskLogs;
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskQuery.java
index db2b2f2..43099ef 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskQuery.java
@@ -22,13 +22,13 @@ import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.util.StringUtil;
 import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
 import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxModelConstants;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.NoneStreamDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
 import org.influxdb.dto.QueryResult;
 import org.influxdb.querybuilder.SelectQueryImpl;
 import org.influxdb.querybuilder.WhereQueryImpl;
@@ -38,8 +38,9 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lte;
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
 
+@Slf4j
 public class ProfileTaskQuery implements IProfileTaskQueryDAO {
-    private InfluxClient client;
+    private final InfluxClient client;
 
     public ProfileTaskQuery(InfluxClient client) {
         this.client = client;
@@ -52,19 +53,22 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO {
                                          final Long endTimeBucket,
                                          final Integer limit) throws IOException {
         WhereQueryImpl<SelectQueryImpl> query =
-            select("id", ProfileTaskRecord.SERVICE_ID,
-                   ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.START_TIME,
-                   ProfileTaskRecord.CREATE_TIME,
-                   InfluxModelConstants.DURATION,
-                   ProfileTaskRecord.MIN_DURATION_THRESHOLD,
-                   ProfileTaskRecord.DUMP_PERIOD,
-                   ProfileTaskRecord.MAX_SAMPLING_COUNT
+            select(
+                InfluxConstants.ID_COLUMN,
+                ProfileTaskRecord.SERVICE_ID,
+                ProfileTaskRecord.ENDPOINT_NAME,
+                ProfileTaskRecord.START_TIME,
+                ProfileTaskRecord.CREATE_TIME,
+                InfluxConstants.DURATION,
+                ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+                ProfileTaskRecord.DUMP_PERIOD,
+                ProfileTaskRecord.MAX_SAMPLING_COUNT
             )
                 .from(client.getDatabase(), ProfileTaskRecord.INDEX_NAME)
                 .where();
 
         if (StringUtil.isNotEmpty(serviceId)) {
-            query.and(eq(NoneStreamDAO.TAG_SERVICE_ID, serviceId));
+            query.and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId));
         }
         if (StringUtil.isNotEmpty(endpointName)) {
             query.and(eq(ProfileTaskRecord.ENDPOINT_NAME, endpointName));
@@ -81,6 +85,9 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO {
 
         List<ProfileTask> tasks = Lists.newArrayList();
         QueryResult.Series series = client.queryForSingleSeries(query);
+        if (log.isDebugEnabled()) {
+            log.debug("SQL: {} result: {}", query.getCommand(), series);
+        }
         if (series != null) {
             series.getValues().forEach(values -> {
                 tasks.add(profileTaskBuilder(values));
@@ -94,20 +101,26 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO {
         if (StringUtil.isEmpty(id)) {
             return null;
         }
-        SelectQueryImpl query = select("id", ProfileTaskRecord.SERVICE_ID,
-                                       ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.START_TIME,
-                                       ProfileTaskRecord.CREATE_TIME,
-                                       InfluxModelConstants.DURATION,
-                                       ProfileTaskRecord.MIN_DURATION_THRESHOLD,
-                                       ProfileTaskRecord.DUMP_PERIOD,
-                                       ProfileTaskRecord.MAX_SAMPLING_COUNT
+        SelectQueryImpl query = select(
+            InfluxConstants.ID_COLUMN,
+            ProfileTaskRecord.SERVICE_ID,
+            ProfileTaskRecord.ENDPOINT_NAME,
+            ProfileTaskRecord.START_TIME,
+            ProfileTaskRecord.CREATE_TIME,
+            InfluxConstants.DURATION,
+            ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+            ProfileTaskRecord.DUMP_PERIOD,
+            ProfileTaskRecord.MAX_SAMPLING_COUNT
         )
             .from(client.getDatabase(), ProfileTaskRecord.INDEX_NAME)
             .where()
-            .and(eq("id", id))
+            .and(eq(InfluxConstants.ID_COLUMN, id))
             .limit(1);
 
         QueryResult.Series series = client.queryForSingleSeries(query);
+        if (log.isDebugEnabled()) {
+            log.debug("SQL: {} result: {}", query.getCommand(), series);
+        }
         if (Objects.nonNull(series)) {
             return profileTaskBuilder(series.getValues().get(0));
         }
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileThreadSnapshotQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileThreadSnapshotQuery.java
index 36a9937..25dfc0e 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileThreadSnapshotQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileThreadSnapshotQuery.java
@@ -26,6 +26,8 @@ import java.util.Base64;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.util.StringUtil;
 import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
 import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
@@ -33,6 +35,7 @@ import org.apache.skywalking.oap.server.core.query.entity.BasicTrace;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
 import org.apache.skywalking.oap.server.library.util.BooleanUtils;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
 import org.elasticsearch.common.Strings;
 import org.influxdb.dto.QueryResult;
 import org.influxdb.querybuilder.WhereQueryImpl;
@@ -43,6 +46,7 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lte;
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
 
+@Slf4j
 public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDAO {
     private final InfluxClient client;
 
@@ -60,7 +64,7 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
 
         final LinkedList<String> segments = new LinkedList<>();
         QueryResult.Series series = client.queryForSingleSeries(query);
-        if (series == null) {
+        if (Objects.isNull(series)) {
             return Collections.emptyList();
         }
         series.getValues().forEach(values -> {
@@ -72,7 +76,7 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
         }
 
         query = select()
-            .function("bottom", SegmentRecord.START_TIME, segments.size())
+            .function(InfluxConstants.SORT_ASC, SegmentRecord.START_TIME, segments.size())
             .column(SegmentRecord.SEGMENT_ID)
             .column(SegmentRecord.START_TIME)
             .column(SegmentRecord.ENDPOINT_NAME)
@@ -130,8 +134,15 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
             .and(gte(ProfileThreadSnapshotRecord.SEQUENCE, minSequence))
             .and(lte(ProfileThreadSnapshotRecord.SEQUENCE, maxSequence));
 
+        QueryResult.Series series = client.queryForSingleSeries(query);
+        if (log.isDebugEnabled()) {
+            log.debug("SQL: {} result: {}", query.getCommand(), series);
+        }
+        if (Objects.isNull(series)) {
+            return Collections.EMPTY_LIST;
+        }
         ArrayList<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence - minSequence);
-        client.queryForSingleSeries(query).getValues().forEach(values -> {
+        series.getValues().forEach(values -> {
             ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord();
 
             record.setTaskId((String) values.get(1));
@@ -165,7 +176,10 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
                 .where()
                 .and(eq(SegmentRecord.SEGMENT_ID, segmentId));
         List<QueryResult.Series> series = client.queryForSeries(query);
-        if (series == null || series.isEmpty()) {
+        if (log.isDebugEnabled()) {
+            log.debug("SQL: {} result set: {}", query.getCommand(), series);
+        }
+        if (Objects.isNull(series) || series.isEmpty()) {
             return null;
         }
 
@@ -198,10 +212,6 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
             .and(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
             .and(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start))
             .and(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end));
-        QueryResult.Series series = client.queryForSingleSeries(query);
-        if (series == null) {
-            return -1;
-        }
-        return ((Number) series.getValues().get(0).get(1)).intValue();
+        return client.getCounter(query);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopNRecordsQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopNRecordsQuery.java
index c247dd6..1dda92c 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopNRecordsQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopNRecordsQuery.java
@@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.query.entity.Order;
 import org.apache.skywalking.oap.server.core.query.entity.TopNRecord;
 import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.RecordDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
 import org.influxdb.dto.QueryResult;
 import org.influxdb.querybuilder.WhereQueryImpl;
 
@@ -50,11 +50,11 @@ public class TopNRecordsQuery implements ITopNRecordsQueryDAO {
     @Override
     public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName,
                                            String serviceId, int topN, Order order) throws IOException {
-        String function = "bottom";
+        String function = InfluxConstants.SORT_ASC;
         // Have to re-sort here. Because the function, top()/bottom(), get the result ordered by the `time`.
         Comparator<TopNRecord> comparator = Comparator.comparingLong(TopNRecord::getLatency);
         if (order.equals(Order.DES)) {
-            function = "top";
+            function = InfluxConstants.SORT_DES;
             comparator = (a, b) -> Long.compare(b.getLatency(), a.getLatency());
         }
 
@@ -68,7 +68,7 @@ public class TopNRecordsQuery implements ITopNRecordsQueryDAO {
             .and(lte(TopN.TIME_BUCKET, endSecondTB));
 
         if (StringUtil.isNotEmpty(serviceId)) {
-            query.and(eq(RecordDAO.TAG_SERVICE_ID, serviceId));
+            query.and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId));
         }
 
         QueryResult.Series series = client.queryForSingleSeries(query);
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopologyQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopologyQuery.java
index 804a785..67b6f4a 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopologyQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopologyQuery.java
@@ -29,14 +29,17 @@ import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.S
 import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
 import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
 import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.query.entity.Call;
 import org.apache.skywalking.oap.server.core.source.DetectPoint;
 import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
+import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
+import org.influxdb.querybuilder.SelectQueryImpl;
+import org.influxdb.querybuilder.SelectSubQueryImpl;
 import org.influxdb.querybuilder.WhereNested;
-import org.influxdb.querybuilder.WhereQueryImpl;
+import org.influxdb.querybuilder.WhereSubQueryImpl;
 
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
@@ -56,7 +59,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
                                                                           long endTB,
                                                                           List<String> serviceIds) throws IOException {
         String measurement = ServiceRelationServerSideMetrics.INDEX_NAME;
-        WhereQueryImpl query = buildServiceCallsQuery(
+        WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
             measurement,
             startTB,
             endTB,
@@ -64,7 +67,8 @@ public class TopologyQuery implements ITopologyQueryDAO {
             ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
             serviceIds
         );
-        return buildServiceCalls(query, DetectPoint.SERVER);
+
+        return buildServiceCalls(buildQuery(subQuery), DetectPoint.SERVER);
     }
 
     @Override
@@ -72,7 +76,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
                                                                          long endTB,
                                                                          List<String> serviceIds) throws IOException {
         String measurement = ServiceRelationClientSideMetrics.INDEX_NAME;
-        WhereQueryImpl query = buildServiceCallsQuery(
+        WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
             measurement,
             startTB,
             endTB,
@@ -80,14 +84,14 @@ public class TopologyQuery implements ITopologyQueryDAO {
             ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
             serviceIds
         );
-        return buildServiceCalls(query, DetectPoint.CLIENT);
+        return buildServiceCalls(buildQuery(subQuery), DetectPoint.CLIENT);
     }
 
     @Override
     public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(DownSampling downsampling, long startTB,
                                                                           long endTB) throws IOException {
         String measurement = ServiceRelationServerSideMetrics.INDEX_NAME;
-        WhereQueryImpl query = buildServiceCallsQuery(
+        WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
             measurement,
             startTB,
             endTB,
@@ -95,14 +99,14 @@ public class TopologyQuery implements ITopologyQueryDAO {
             ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
             new ArrayList<>(0)
         );
-        return buildServiceCalls(query, DetectPoint.SERVER);
+        return buildServiceCalls(buildQuery(subQuery), DetectPoint.SERVER);
     }
 
     @Override
     public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(DownSampling downsampling, long startTB,
                                                                          long endTB) throws IOException {
         String tableName = ServiceRelationClientSideMetrics.INDEX_NAME;
-        WhereQueryImpl query = buildServiceCallsQuery(
+        WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
             tableName,
             startTB,
             endTB,
@@ -110,7 +114,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
             ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
             new ArrayList<>(0)
         );
-        return buildServiceCalls(query, DetectPoint.CLIENT);
+        return buildServiceCalls(buildQuery(subQuery), DetectPoint.CLIENT);
     }
 
     @Override
@@ -120,14 +124,15 @@ public class TopologyQuery implements ITopologyQueryDAO {
                                                                           long startTB,
                                                                           long endTB) throws IOException {
         String measurement = ServiceInstanceRelationServerSideMetrics.INDEX_NAME;
-        WhereQueryImpl query = buildServiceInstanceCallsQuery(measurement,
-                                                              startTB,
-                                                              endTB,
-                                                              ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
-                                                              ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
-                                                              clientServiceId, serverServiceId
+        WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceInstanceCallsQuery(
+            measurement,
+            startTB,
+            endTB,
+            ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
+            ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
+            clientServiceId, serverServiceId
         );
-        return buildInstanceCalls(query, DetectPoint.SERVER);
+        return buildInstanceCalls(buildQuery(subQuery), DetectPoint.SERVER);
     }
 
     @Override
@@ -137,14 +142,15 @@ public class TopologyQuery implements ITopologyQueryDAO {
                                                                           long startTB,
                                                                           long endTB) throws IOException {
         String measurement = ServiceInstanceRelationClientSideMetrics.INDEX_NAME;
-        WhereQueryImpl query = buildServiceInstanceCallsQuery(measurement,
-                                                              startTB,
-                                                              endTB,
-                                                              ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID,
-                                                              ServiceInstanceRelationClientSideMetrics.DEST_SERVICE_ID,
-                                                              clientServiceId, serverServiceId
+        WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceInstanceCallsQuery(
+            measurement,
+            startTB,
+            endTB,
+            ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID,
+            ServiceInstanceRelationClientSideMetrics.DEST_SERVICE_ID,
+            clientServiceId, serverServiceId
         );
-        return buildInstanceCalls(query, DetectPoint.CLIENT);
+        return buildInstanceCalls(buildQuery(subQuery), DetectPoint.CLIENT);
     }
 
     @Override
@@ -154,7 +160,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
                                                       String destEndpointId) throws IOException {
         String measurement = EndpointRelationServerSideMetrics.INDEX_NAME;
 
-        WhereQueryImpl query = buildServiceCallsQuery(
+        WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
             measurement,
             startTB,
             endTB,
@@ -162,9 +168,9 @@ public class TopologyQuery implements ITopologyQueryDAO {
             EndpointRelationServerSideMetrics.DEST_ENDPOINT,
             Collections.emptyList()
         );
-        query.and(eq(EndpointRelationServerSideMetrics.DEST_ENDPOINT, destEndpointId));
+        subQuery.and(eq(EndpointRelationServerSideMetrics.DEST_ENDPOINT, destEndpointId));
 
-        WhereQueryImpl query2 = buildServiceCallsQuery(
+        WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery2 = buildServiceCallsQuery(
             measurement,
             startTB,
             endTB,
@@ -172,61 +178,73 @@ public class TopologyQuery implements ITopologyQueryDAO {
             EndpointRelationServerSideMetrics.DEST_ENDPOINT,
             Collections.emptyList()
         );
-        query2.and(eq(EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, destEndpointId));
+        subQuery2.and(eq(EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, destEndpointId));
 
-        List<Call.CallDetail> calls = buildEndpointCalls(query, DetectPoint.SERVER);
-        calls.addAll(buildEndpointCalls(query2, DetectPoint.CLIENT));
+        List<Call.CallDetail> calls = buildEndpointCalls(buildQuery(subQuery), DetectPoint.SERVER);
+        calls.addAll(buildEndpointCalls(buildQuery(subQuery), DetectPoint.CLIENT));
         return calls;
     }
 
-    private WhereQueryImpl buildServiceCallsQuery(String measurement, long startTB, long endTB, String sourceCName,
-                                                  String destCName, List<String> serviceIds) {
-        WhereQueryImpl query = select()
-            .function("distinct", Metrics.ENTITY_ID, ServiceRelationServerSideMetrics.COMPONENT_ID)
-            .from(client.getDatabase(), measurement)
+    private WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> buildServiceCallsQuery(
+        String measurement,
+        long startTB,
+        long endTB,
+        String sourceCName,
+        String destCName,
+        List<String> serviceIds) {
+        WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select()
+            .fromSubQuery(client.getDatabase())
+            .function("distinct", ServiceInstanceRelationServerSideMetrics.COMPONENT_ID)
+            .as(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID)
+            .from(measurement)
             .where()
             .and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB)))
             .and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB)));
 
         if (!serviceIds.isEmpty()) {
-            WhereNested whereNested = query.andNested();
+            WhereNested whereNested = subQuery.andNested();
             for (String id : serviceIds) {
                 whereNested.or(eq(sourceCName, id))
                            .or(eq(destCName, id));
             }
             whereNested.close();
         }
-        return query;
+        return subQuery;
     }
 
-    private WhereQueryImpl buildServiceInstanceCallsQuery(String measurement,
-                                                          long startTB,
-                                                          long endTB,
-                                                          String sourceCName,
-                                                          String destCName,
-                                                          String sourceServiceId,
-                                                          String destServiceId) {
-        WhereQueryImpl query = select()
-            .function("distinct", Metrics.ENTITY_ID, ServiceInstanceRelationServerSideMetrics.COMPONENT_ID)
-            .from(client.getDatabase(), measurement)
+    private WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> buildServiceInstanceCallsQuery(
+        String measurement,
+        long startTB,
+        long endTB,
+        String sourceCName,
+        String destCName,
+        String sourceServiceId,
+        String destServiceId) {
+
+        WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select()
+            .fromSubQuery(client.getDatabase())
+            .function("distinct", ServiceInstanceRelationServerSideMetrics.COMPONENT_ID)
+            .as(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID)
+            .from(measurement)
             .where()
             .and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB)))
             .and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB)));
 
         StringBuilder builder = new StringBuilder("((");
-        builder.append(sourceCName).append("=").append(sourceServiceId)
-               .append(" and ")
-               .append(destCName).append("=").append(destServiceId)
-               .append(") or (")
-               .append(sourceCName).append("=").append(destServiceId)
-               .append(") and (")
-               .append(destCName).append("=").append(sourceServiceId)
-               .append("))");
-        query.where(builder.toString());
-        return query;
+        builder.append(sourceCName).append("='").append(sourceServiceId)
+               .append("' and ")
+               .append(destCName).append("='").append(destServiceId)
+               .append("') or (")
+               .append(sourceCName).append("='").append(destServiceId)
+               .append("') and (")
+               .append(destCName).append("='").append(sourceServiceId)
+               .append("'))");
+        subQuery.where(builder.toString());
+        subQuery.groupBy(InfluxConstants.TagName.ENTITY_ID);
+        return subQuery;
     }
 
-    private List<Call.CallDetail> buildServiceCalls(WhereQueryImpl query,
+    private List<Call.CallDetail> buildServiceCalls(Query query,
                                                     DetectPoint detectPoint) throws IOException {
         QueryResult.Series series = client.queryForSingleSeries(query);
 
@@ -240,7 +258,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
         List<Call.CallDetail> calls = new ArrayList<>();
         series.getValues().forEach(values -> {
             Call.CallDetail call = new Call.CallDetail();
-            String entityId = (String) values.get(1);
+            String entityId = String.valueOf(values.get(1));
             int componentId = (int) values.get(2);
             call.buildFromServiceRelation(entityId, componentId, detectPoint);
             calls.add(call);
@@ -248,7 +266,15 @@ public class TopologyQuery implements ITopologyQueryDAO {
         return calls;
     }
 
-    private List<Call.CallDetail> buildInstanceCalls(WhereQueryImpl query,
+    private Query buildQuery(WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery) {
+        SelectQueryImpl query = select().column(InfluxConstants.TagName.ENTITY_ID)
+                                        .column(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID)
+                                        .from(client.getDatabase());
+        query.setSubQuery(subQuery.groupBy(InfluxConstants.TagName.ENTITY_ID));
+        return query;
+    }
+
+    private List<Call.CallDetail> buildInstanceCalls(Query query,
                                                      DetectPoint detectPoint) throws IOException {
         QueryResult.Series series = client.queryForSingleSeries(query);
 
@@ -270,7 +296,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
         return calls;
     }
 
-    private List<Call.CallDetail> buildEndpointCalls(WhereQueryImpl query,
+    private List<Call.CallDetail> buildEndpointCalls(Query query,
                                                      DetectPoint detectPoint) throws IOException {
         QueryResult.Series series = client.queryForSingleSeries(query);
 
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java
index eac9706..9e577fa 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java
@@ -34,7 +34,7 @@ import org.apache.skywalking.oap.server.core.query.entity.TraceState;
 import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
 import org.apache.skywalking.oap.server.library.util.BooleanUtils;
 import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.RecordDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
 import org.elasticsearch.common.Strings;
 import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
@@ -78,7 +78,7 @@ public class TraceQuery implements ITraceQueryDAO {
         }
 
         WhereQueryImpl<SelectQueryImpl> recallQuery = select()
-            .function("top", orderBy, limit + from)
+            .function(InfluxConstants.SORT_DES, orderBy, limit + from)
             .column(SegmentRecord.SEGMENT_ID)
             .column(SegmentRecord.START_TIME)
             .column(SegmentRecord.ENDPOINT_NAME)
@@ -102,7 +102,7 @@ public class TraceQuery implements ITraceQueryDAO {
             recallQuery.and(contains(SegmentRecord.ENDPOINT_NAME, endpointName.replaceAll("/", "\\\\/")));
         }
         if (StringUtil.isNotEmpty(serviceId)) {
-            recallQuery.and(eq(RecordDAO.TAG_SERVICE_ID, String.valueOf(serviceId)));
+            recallQuery.and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId));
         }
         if (StringUtil.isNotEmpty(serviceInstanceId)) {
             recallQuery.and(eq(SegmentRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
@@ -201,9 +201,9 @@ public class TraceQuery implements ITraceQueryDAO {
             segmentRecord.setEndTime((long) values.get(7));
             segmentRecord.setLatency((int) values.get(8));
             segmentRecord.setIsError((int) values.get(9));
-            segmentRecord.setVersion((int) values.get(10));
+            segmentRecord.setVersion((int) values.get(11));
 
-            String base64 = (String) values.get(9);
+            String base64 = (String) values.get(10);
             if (!Strings.isNullOrEmpty(base64)) {
                 segmentRecord.setDataBinary(Base64.getDecoder().decode(base64));
             }
diff --git a/test/e2e/e2e-test/docker/profile/docker-compose.influxdb.yml b/test/e2e/e2e-test/docker/profile/docker-compose.influxdb.yml
index 4f0dc47..377ea79 100644
--- a/test/e2e/e2e-test/docker/profile/docker-compose.influxdb.yml
+++ b/test/e2e/e2e-test/docker/profile/docker-compose.influxdb.yml
@@ -22,8 +22,6 @@ services:
       - 8086
     networks:
       - e2e
-    depends_on:
-      - h2db
     healthcheck:
       test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/8086"]
       interval: 5s
@@ -37,8 +35,6 @@ services:
     environment:
       SW_STORAGE: influxdb
     depends_on:
-      h2db:
-        condition: service_healthy
       influxdb:
         condition: service_healthy
 
diff --git a/test/plugin/run.sh b/test/plugin/run.sh
index 56b6c62..d81ecff 100755
--- a/test/plugin/run.sh
+++ b/test/plugin/run.sh
@@ -187,7 +187,6 @@ fi
 supported_versions=`grep -v -E "^$|^#" ${supported_version_file}`
 for version in ${supported_versions}
 do
-    waitForAvailable
     testcase_name="${scenario_name}-${version}"
 
     # testcase working directory, there are logs, data and packages.
@@ -218,8 +217,10 @@ do
     [[ $? -ne 0 ]] && exitWithMessage "${testcase_name}, generate script failure!"
 
     echo "start container of testcase.name=${testcase_name}"
-    bash ${case_work_base}/scenario.sh ${task_state_house} 1>${case_work_logs_dir}/${testcase_name}.log &
+    bash ${case_work_base}/scenario.sh ${task_state_house} 1>${case_work_logs_dir}/${testcase_name}.log
     sleep 3
+    waitForAvailable
+    rm -rf ${case_work_base}
 done
 
 echo -e "\033[33m${scenario_name} has already sumbitted\033[0m"
diff --git a/test/plugin/scenarios/mysql-scenario/support-version.list b/test/plugin/scenarios/mysql-scenario/support-version.list
index 4768268..bf4ddf6 100644
--- a/test/plugin/scenarios/mysql-scenario/support-version.list
+++ b/test/plugin/scenarios/mysql-scenario/support-version.list
@@ -36,13 +36,4 @@
 5.1.26
 5.1.24
 5.1.22
-5.1.20
-5.1.18
-5.1.16
-5.1.14
-5.1.12
-5.1.10
-5.1.8
-5.1.6
-5.1.4
-5.1.2
\ No newline at end of file
+5.1.20
\ No newline at end of file
diff --git a/test/plugin/scenarios/solrj-7.x-scenario/configuration.yml b/test/plugin/scenarios/solrj-7.x-scenario/configuration.yml
index aea4ef8..be366df 100644
--- a/test/plugin/scenarios/solrj-7.x-scenario/configuration.yml
+++ b/test/plugin/scenarios/solrj-7.x-scenario/configuration.yml
@@ -25,6 +25,7 @@ dependencies:
   solr-server:
     image: solr:${CASE_SERVER_IMAGE_VERSION}
     hostname: solr-server
+    removeOnExit: true
     entrypoint:
       - docker-entrypoint.sh
       - solr-precreate