You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/07/20 14:59:57 UTC

[skywalking] branch master updated: Improve OAP server performance. (#3127)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 6338de3  Improve OAP server performance. (#3127)
6338de3 is described below

commit 6338de392fe988e4bf46e8a6c5aadb92b946ddab
Author: 彭勇升 pengys <pe...@apache.org>
AuthorDate: Sat Jul 20 22:59:52 2019 +0800

    Improve OAP server performance. (#3127)
    
    * Improve OAP server performance.
---
 .../analysis/worker/MetricsPersistentWorker.java   | 76 +++++++++++------
 .../analysis/worker/MetricsStreamProcessor.java    |  8 +-
 .../analysis/worker/RecordStreamProcessor.java     |  4 +-
 .../core/analysis/worker/TopNStreamProcessor.java  |  2 +-
 .../register/worker/InventoryStreamProcessor.java  |  2 +-
 .../oap/server/core/storage/IMetricsDAO.java       |  3 +-
 .../oap/server/core/storage/PersistenceTimer.java  | 98 +++++++++++++++-------
 .../oap/server/core/storage/StorageDAO.java        |  4 +-
 .../server/core/storage/model/IModelSetter.java    |  2 +-
 .../oap/server/core/storage/model/Model.java       |  4 +-
 .../server/core/storage/model/StorageModels.java   |  4 +-
 .../client/elasticsearch/ElasticSearchClient.java  | 28 ++-----
 .../elasticsearch/ITElasticSearchClient.java       | 31 +++----
 .../src/main/assembly/application.yml              |  3 +-
 .../src/main/resources/application.yml             |  3 +-
 .../StorageModuleElasticsearchConfig.java          |  1 -
 .../StorageModuleElasticsearchProvider.java        |  2 +-
 .../elasticsearch/base/BatchProcessEsDAO.java      |  6 +-
 .../plugin/elasticsearch/base/MetricsEsDAO.java    | 20 +++--
 .../plugin/elasticsearch/base/StorageEsDAO.java    |  6 +-
 .../elasticsearch/base/StorageEsInstaller.java     |  7 +-
 .../elasticsearch/query/MetricsQueryEsDAO.java     | 22 +++--
 .../storage/plugin/jdbc/h2/dao/H2MetricsDAO.java   |  7 +-
 .../storage/plugin/jdbc/h2/dao/H2SQLExecutor.java  | 31 ++-----
 .../storage/plugin/jdbc/h2/dao/H2StorageDAO.java   | 12 ++-
 25 files changed, 211 insertions(+), 175 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index d9b27f8..4e819b4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -31,8 +31,6 @@ import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.slf4j.*;
 
-import static java.util.Objects.nonNull;
-
 /**
  * @author peng-yongsheng
  */
@@ -42,7 +40,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
 
     private final Model model;
     private final MergeDataCache<Metrics> mergeDataCache;
-    private final IMetricsDAO metricsDAO;
+    private final IMetricsDAO<?, ?> metricsDAO;
     private final AbstractWorker<Metrics> nextAlarmWorker;
     private final AbstractWorker<ExportEvent> nextExportWorker;
     private final DataCarrier<Metrics> dataCarrier;
@@ -99,40 +97,64 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
     }
 
     @Override public List<Object> prepareBatch(MergeDataCache<Metrics> cache) {
+        long start = System.currentTimeMillis();
+
         List<Object> batchCollection = new LinkedList<>();
-        cache.getLast().collection().forEach(data -> {
+
+        Collection<Metrics> collection = cache.getLast().collection();
+
+        int i = 0;
+        Metrics[] metrics = null;
+        for (Metrics data : collection) {
             if (Objects.nonNull(nextExportWorker)) {
                 ExportEvent event = new ExportEvent(data, ExportEvent.EventType.INCREMENT);
                 nextExportWorker.in(event);
             }
 
-            Metrics dbData = null;
-            try {
-                dbData = metricsDAO.get(model, data);
-            } catch (Throwable t) {
-                logger.error(t.getMessage(), t);
-            }
-            try {
-                if (nonNull(dbData)) {
-                    data.combine(dbData);
-                    data.calculate();
-
-                    batchCollection.add(metricsDAO.prepareBatchUpdate(model, data));
+            int batchGetSize = 2000;
+            int mod = i % batchGetSize;
+            if (mod == 0) {
+                int residual = collection.size() - i;
+                if (residual >= batchGetSize) {
+                    metrics = new Metrics[batchGetSize];
                 } else {
-                    batchCollection.add(metricsDAO.prepareBatchInsert(model, data));
+                    metrics = new Metrics[residual];
                 }
-
-                if (Objects.nonNull(nextAlarmWorker)) {
-                    nextAlarmWorker.in(data);
-                }
-                if (Objects.nonNull(nextExportWorker)) {
-                    ExportEvent event = new ExportEvent(data, ExportEvent.EventType.TOTAL);
-                    nextExportWorker.in(event);
+            }
+            metrics[mod] = data;
+
+            if (mod == metrics.length - 1) {
+                try {
+                    Map<String, Metrics> dbMetricsMap = metricsDAO.get(model, metrics);
+
+                    for (Metrics metric : metrics) {
+                        if (dbMetricsMap.containsKey(metric.id())) {
+                            metric.combine(dbMetricsMap.get(metric.id()));
+                            metric.calculate();
+                            batchCollection.add(metricsDAO.prepareBatchUpdate(model, metric));
+                        } else {
+                            batchCollection.add(metricsDAO.prepareBatchInsert(model, metric));
+                        }
+
+                        if (Objects.nonNull(nextAlarmWorker)) {
+                            nextAlarmWorker.in(metric);
+                        }
+                        if (Objects.nonNull(nextExportWorker)) {
+                            ExportEvent event = new ExportEvent(metric, ExportEvent.EventType.TOTAL);
+                            nextExportWorker.in(event);
+                        }
+                    }
+                } catch (Throwable t) {
+                    logger.error(t.getMessage(), t);
                 }
-            } catch (Throwable t) {
-                logger.error(t.getMessage(), t);
             }
-        });
+
+            i++;
+        }
+
+        if (batchCollection.size() > 0) {
+            logger.debug("prepareBatch model {}, took time: {}", model.getName(), System.currentTimeMillis() - start);
+        }
 
         return batchCollection;
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 20d09bf..dfd4b20 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -75,19 +75,19 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
         MetricsPersistentWorker monthPersistentWorker = null;
 
         if (configService.shouldToHour()) {
-            Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Hour));
+            Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Hour), false);
             hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
         }
         if (configService.shouldToDay()) {
-            Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Day));
+            Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Day), false);
             dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
         }
         if (configService.shouldToMonth()) {
-            Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Month));
+            Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Month), false);
             monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
         }
 
-        Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute));
+        Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute), false);
         MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model);
 
         MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
index 6483a11..b84d99b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
@@ -65,8 +65,8 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
         }
 
         IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
-        Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second));
-        RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, 1000, recordDAO);
+        Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
+        RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, 4000, recordDAO);
 
         persistentWorkers.add(persistentWorker);
         workers.put(recordClass, persistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
index 03ae693..fcb3a68 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
@@ -61,7 +61,7 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
         }
 
         IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
-        Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second));
+        Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
 
         TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, 50, recordDAO);
         persistentWorkers.add(persistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
index 596ab81..4f9f3f7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
@@ -56,7 +56,7 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
         }
 
         IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
-        Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None));
+        Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None), false);
 
         StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
         streamDataMappingSetter.putIfAbsent(inventoryClass);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
index 29ec617..5c2e246 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.storage;
 
 import java.io.IOException;
+import java.util.Map;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 
@@ -27,7 +28,7 @@ import org.apache.skywalking.oap.server.core.storage.model.Model;
  */
 public interface IMetricsDAO<INSERT, UPDATE> extends DAO {
 
-    Metrics get(Model model, Metrics metrics) throws IOException;
+    Map<String, Metrics> get(Model model, Metrics[] metrics) throws IOException;
 
     INSERT prepareBatchInsert(Model model, Metrics metrics) throws IOException;
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 08cced4..ffe3480 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -61,17 +61,20 @@ public enum PersistenceTimer {
 
         if (!isStarted) {
             Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
-                new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO),
-                    t -> logger.error("Extract data and save failure.", t)), 1, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
+                new RunnableWithExceptionProtection(() -> extractDataAndSaveRecord(batchDAO),
+                    t -> logger.error("Extract data and save record failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
+
+            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+                new RunnableWithExceptionProtection(() -> extractDataAndSaveMetrics(batchDAO),
+                    t -> logger.error("Extract data and save metrics failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
 
             this.isStarted = true;
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private void extractDataAndSave(IBatchDAO batchDAO) {
+    private void extractDataAndSaveRecord(IBatchDAO batchDAO) {
         if (logger.isDebugEnabled()) {
-            logger.debug("Extract data and save");
+            logger.debug("Extract data and save record");
         }
 
         long startTime = System.currentTimeMillis();
@@ -79,36 +82,12 @@ public enum PersistenceTimer {
             HistogramMetrics.Timer timer = prepareLatency.createTimer();
 
             List records = new LinkedList();
-            List metrics = new LinkedList();
             try {
                 List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
-                persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
                 persistenceWorkers.addAll(RecordStreamProcessor.getInstance().getPersistentWorkers());
                 persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
 
-                persistenceWorkers.forEach(worker -> {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("extract {} worker data and save", worker.getClass().getName());
-                    }
-
-                    if (worker.flushAndSwitch()) {
-                        List<?> batchCollection = worker.buildBatchCollection();
-
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
-                        }
-
-                        if (worker instanceof RecordPersistentWorker) {
-                            records.addAll(batchCollection);
-                        } else if (worker instanceof MetricsPersistentWorker) {
-                            metrics.addAll(batchCollection);
-                        } else if (worker instanceof TopNWorker) {
-                            records.addAll(batchCollection);
-                        } else {
-                            logger.error("Missing the worker {}", worker.getClass().getSimpleName());
-                        }
-                    }
-                });
+                buildBatchCollection(persistenceWorkers, records);
 
                 if (debug) {
                     logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
@@ -122,6 +101,46 @@ public enum PersistenceTimer {
                 if (CollectionUtils.isNotEmpty(records)) {
                     batchDAO.asynchronous(records);
                 }
+            } finally {
+                executeLatencyTimer.finish();
+            }
+        } catch (Throwable e) {
+            errorCounter.inc();
+            logger.error(e.getMessage(), e);
+        } finally {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Persistence data save finish");
+            }
+        }
+
+        if (debug) {
+            logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+        }
+    }
+
+    private void extractDataAndSaveMetrics(IBatchDAO batchDAO) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Extract data and save metrics");
+        }
+
+        long startTime = System.currentTimeMillis();
+        try {
+            HistogramMetrics.Timer timer = prepareLatency.createTimer();
+
+            List metrics = new LinkedList();
+            try {
+                List<PersistenceWorker> persistenceWorkers = new ArrayList<>(MetricsStreamProcessor.getInstance().getPersistentWorkers());
+                buildBatchCollection(persistenceWorkers, metrics);
+
+                if (debug) {
+                    logger.info("build metrics batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+                }
+            } finally {
+                timer.finish();
+            }
+
+            HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
+            try {
                 if (CollectionUtils.isNotEmpty(metrics)) {
                     batchDAO.synchronous(metrics);
                 }
@@ -141,4 +160,23 @@ public enum PersistenceTimer {
             logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
         }
     }
+
+    @SuppressWarnings("unchecked")
+    private void buildBatchCollection(List<PersistenceWorker> persistenceWorkers, List collection) {
+        persistenceWorkers.forEach(worker -> {
+            if (logger.isDebugEnabled()) {
+                logger.debug("extract {} worker data and save", worker.getClass().getName());
+            }
+
+            if (worker.flushAndSwitch()) {
+                List<?> batchCollection = worker.buildBatchCollection();
+
+                if (logger.isDebugEnabled()) {
+                    logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
+                }
+
+                collection.addAll(batchCollection);
+            }
+        });
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
index 4d3f26d..35d178d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
@@ -26,9 +26,9 @@ import org.apache.skywalking.oap.server.library.module.Service;
 /**
  * @author peng-yongsheng
  */
-public interface StorageDAO extends Service {
+public interface StorageDAO<INSERT, UPDATE> extends Service {
 
-    IMetricsDAO newMetricsDao(StorageBuilder<Metrics> storageBuilder);
+    IMetricsDAO<INSERT, UPDATE> newMetricsDao(StorageBuilder<Metrics> storageBuilder);
 
     IRegisterDAO newRegisterDao(StorageBuilder<RegisterSource> storageBuilder);
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
index a471212..b30c457 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
@@ -26,5 +26,5 @@ import org.apache.skywalking.oap.server.library.module.Service;
  */
 public interface IModelSetter extends Service {
 
-    Model putIfAbsent(Class aClass, int scopeId, Storage storage);
+    Model putIfAbsent(Class aClass, int scopeId, Storage storage, boolean record);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
index b2dccc9..9693ef7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
@@ -34,13 +34,15 @@ public class Model {
     private final boolean deleteHistory;
     private final List<ModelColumn> columns;
     private final int scopeId;
+    private final boolean record;
 
-    public Model(String name, List<ModelColumn> columns, boolean capableOfTimeSeries, boolean deleteHistory, int scopeId, Downsampling downsampling) {
+    public Model(String name, List<ModelColumn> columns, boolean capableOfTimeSeries, boolean deleteHistory, int scopeId, Downsampling downsampling, boolean record) {
         this.columns = columns;
         this.capableOfTimeSeries = capableOfTimeSeries;
         this.downsampling = downsampling;
         this.deleteHistory = deleteHistory;
         this.scopeId = scopeId;
         this.name = ModelName.build(downsampling, name);
+        this.record = record;
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 2642523..3ccf594 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -37,7 +37,7 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
         this.models = new LinkedList<>();
     }
 
-    @Override public Model putIfAbsent(Class aClass, int scopeId, Storage storage) {
+    @Override public Model putIfAbsent(Class aClass, int scopeId, Storage storage, boolean record) {
         // Check this scope id is valid.
         DefaultScopeDefine.nameOf(scopeId);
 
@@ -50,7 +50,7 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
         List<ModelColumn> modelColumns = new LinkedList<>();
         retrieval(aClass, storage.getModelName(), modelColumns);
 
-        Model model = new Model(storage.getModelName(), modelColumns, storage.isCapableOfTimeSeries(), storage.isDeleteHistory(), scopeId, storage.getDownsampling());
+        Model model = new Model(storage.getModelName(), modelColumns, storage.isCapableOfTimeSeries(), storage.isDeleteHistory(), scopeId, storage.getDownsampling(), record);
         models.add(model);
 
         return model;
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index a4a4ec6..cbee9e2 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -40,10 +40,9 @@ import org.elasticsearch.action.search.*;
 import org.elasticsearch.action.support.*;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.*;
-import org.elasticsearch.common.unit.*;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.*;
 import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.slf4j.*;
 
@@ -227,29 +226,13 @@ public class ElasticSearchClient implements Client {
         return client.get(request);
     }
 
-    public SearchResponse idQuery(String indexName, String id) throws IOException {
-        indexName = formatIndexName(indexName);
-
-        SearchRequest searchRequest = new SearchRequest(indexName);
-        searchRequest.types(TYPE);
-        searchRequest.source().query(QueryBuilders.idsQuery().addIds(id));
-        return client.search(searchRequest);
-    }
-
-    public Map<String, Map<String, Object>> ids(String indexName, String... ids) throws IOException {
+    public SearchResponse ids(String indexName, String[] ids) throws IOException {
         indexName = formatIndexName(indexName);
 
         SearchRequest searchRequest = new SearchRequest(indexName);
         searchRequest.types(TYPE);
         searchRequest.source().query(QueryBuilders.idsQuery().addIds(ids)).size(ids.length);
-        SearchResponse response = client.search(searchRequest);
-
-        Map<String, Map<String, Object>> result = new HashMap<>();
-        SearchHit[] hits = response.getHits().getHits();
-        for (SearchHit hit : hits) {
-            result.put(hit.getId(), hit.getSourceAsMap());
-        }
-        return result;
+        return client.search(searchRequest);
     }
 
     public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
@@ -312,7 +295,7 @@ public class ElasticSearchClient implements Client {
         }
     }
 
-    public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval, int concurrentRequests) {
+    public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) {
         BulkProcessor.Listener listener = new BulkProcessor.Listener() {
             @Override
             public void beforeBulk(long executionId, BulkRequest request) {
@@ -325,7 +308,7 @@ public class ElasticSearchClient implements Client {
                 if (response.hasFailures()) {
                     logger.warn("Bulk [{}] executed with failures", executionId);
                 } else {
-                    logger.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
+                    logger.info("Bulk execution id [{}] completed in {} milliseconds, size: {}", executionId, response.getTook().getMillis(), request.requests().size());
                 }
             }
 
@@ -337,7 +320,6 @@ public class ElasticSearchClient implements Client {
 
         return BulkProcessor.builder(client::bulkAsync, listener)
             .setBulkActions(bulkActions)
-            .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
             .setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
             .setConcurrentRequests(concurrentRequests)
             .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
index c0c0854..6beba6b 100644
--- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
@@ -18,8 +18,10 @@
 
 package org.apache.skywalking.oap.server.library.client.elasticsearch;
 
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
+import com.google.gson.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.client.methods.HttpGet;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
@@ -27,26 +29,13 @@ import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Response;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.client.*;
+import org.elasticsearch.common.xcontent.*;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
 import org.powermock.reflect.Whitebox;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -188,7 +177,7 @@ public class ITElasticSearchClient {
 
     @Test
     public void bulk() throws InterruptedException {
-        BulkProcessor bulkProcessor = client.createBulkProcessor(2000, 200, 10, 2);
+        BulkProcessor bulkProcessor = client.createBulkProcessor(2000, 10, 2);
 
         Map<String, String> source = new HashMap<>();
         source.put("column1", "value1");
@@ -246,7 +235,7 @@ public class ITElasticSearchClient {
     }
 
     private RestHighLevelClient getRestHighLevelClient() {
-        return (RestHighLevelClient) Whitebox.getInternalState(client, "client");
+        return (RestHighLevelClient)Whitebox.getInternalState(client, "client");
     }
 
     private JsonObject undoFormatIndexName(JsonObject index) {
diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml
index c5b01a3..bc4b745 100644
--- a/oap-server/server-starter/src/main/assembly/application.yml
+++ b/oap-server/server-starter/src/main/assembly/application.yml
@@ -75,8 +75,7 @@ storage:
 #    otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day
 #    monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month
 #    # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
-#    bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
-#    bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
+#    bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
 #    flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
 #    concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
 #    metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 665a852..ec79843 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -75,8 +75,7 @@ storage:
     otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day
     monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month
     # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
-    bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
-    bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
+    bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
     flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
     concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
     metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index bb5671e..48f4e0c 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -32,7 +32,6 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
     @Setter private int indexReplicasNumber = 0;
     @Setter private int indexRefreshInterval = 2;
     @Setter private int bulkActions = 2000;
-    @Setter private int bulkSize = 20;
     @Setter private int flushInterval = 10;
     @Setter private int concurrentRequests = 2;
     @Setter private int syncBulkActions = 3;
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 58e8383..63d936b 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -68,7 +68,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
         }
         elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getNameSpace(), config.getUser(), config.getPassword());
 
-        this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
+        this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests()));
         this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
         this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient));
         this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearchClient, new ElasticsearchStorageTTL()));
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index a3d52e9..d4d118d 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -36,22 +36,20 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
 
     private BulkProcessor bulkProcessor;
     private final int bulkActions;
-    private final int bulkSize;
     private final int flushInterval;
     private final int concurrentRequests;
 
-    public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int bulkSize, int flushInterval,
+    public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int flushInterval,
         int concurrentRequests) {
         super(client);
         this.bulkActions = bulkActions;
-        this.bulkSize = bulkSize;
         this.flushInterval = flushInterval;
         this.concurrentRequests = concurrentRequests;
     }
 
     @Override public void asynchronous(List<?> collection) {
         if (bulkProcessor == null) {
-            this.bulkProcessor = getClient().createBulkProcessor(bulkActions, bulkSize, flushInterval, concurrentRequests);
+            this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
         }
 
         if (logger.isDebugEnabled()) {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index e313c60..a0c8212 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
 import java.io.IOException;
+import java.util.*;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -40,13 +41,20 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO<IndexRequest, Upd
         this.storageBuilder = storageBuilder;
     }
 
-    @Override public Metrics get(Model model, Metrics metrics) throws IOException {
-        SearchResponse response = getClient().idQuery(model.getName(), metrics.id());
-        if (response.getHits().totalHits > 0) {
-            return storageBuilder.map2Data(response.getHits().getAt(0).getSourceAsMap());
-        } else {
-            return null;
+    @Override public Map<String, Metrics> get(Model model, Metrics[] metrics) throws IOException {
+        Map<String, Metrics> result = new HashMap<>();
+
+        String[] ids = new String[metrics.length];
+        for (int i = 0; i < metrics.length; i++) {
+            ids[i] = metrics[i].id();
+        }
+
+        SearchResponse response = getClient().ids(model.getName(), ids);
+        for (int i = 0; i < response.getHits().totalHits; i++) {
+            Metrics source = storageBuilder.map2Data(response.getHits().getAt(i).getSourceAsMap());
+            result.put(source.id(), source);
         }
+        return result;
     }
 
     @Override public IndexRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
index 52fe3fb..318b9a8 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
@@ -23,17 +23,19 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
 import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
  * @author peng-yongsheng
  */
-public class StorageEsDAO extends EsDAO implements StorageDAO {
+public class StorageEsDAO extends EsDAO implements StorageDAO<IndexRequest, UpdateRequest> {
 
     public StorageEsDAO(ElasticSearchClient client) {
         super(client);
     }
 
-    @Override public IMetricsDAO newMetricsDao(StorageBuilder<Metrics> storageBuilder) {
+    @Override public IMetricsDAO<IndexRequest, UpdateRequest> newMetricsDao(StorageBuilder<Metrics> storageBuilder) {
         return new MetricsEsDAO(getClient(), storageBuilder);
     }
 
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
index 7883049..0246ced 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -64,7 +64,7 @@ public class StorageEsInstaller extends ModelInstaller {
     @Override protected void createTable(Client client, Model model) throws StorageException {
         ElasticSearchClient esClient = (ElasticSearchClient)client;
 
-        JsonObject settings = createSetting();
+        JsonObject settings = createSetting(model.isRecord());
         JsonObject mapping = createMapping(model);
         logger.info("index {}'s columnTypeEsMapping builder str: {}", esClient.formatIndexName(model.getName()), mapping.toString());
 
@@ -97,13 +97,12 @@ public class StorageEsInstaller extends ModelInstaller {
         }
     }
 
-    private JsonObject createSetting() {
+    private JsonObject createSetting(boolean record) {
         JsonObject setting = new JsonObject();
         setting.addProperty("index.number_of_shards", indexShardsNumber);
         setting.addProperty("index.number_of_replicas", indexReplicasNumber);
-        setting.addProperty("index.refresh_interval", TimeValue.timeValueSeconds(indexRefreshInterval).toString());
+        setting.addProperty("index.refresh_interval", record ? TimeValue.timeValueSeconds(10).toString() : TimeValue.timeValueSeconds(indexRefreshInterval).toString());
         setting.addProperty("analysis.analyzer.oap_analyzer.type", "stop");
-        TimeValue.timeValueSeconds(3);
         return setting;
     }
 
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
index 7df8ba6..dd36e41 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
@@ -29,6 +29,7 @@ import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.bucket.terms.*;
 import org.elasticsearch.search.aggregations.metrics.avg.Avg;
@@ -102,15 +103,16 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
     @Override public IntValues getLinearIntValues(String indName, Downsampling downsampling, List<String> ids, String valueCName) throws IOException {
         String indexName = ModelName.build(downsampling, indName);
 
-        Map<String, Map<String, Object>> response = getClient().ids(indexName, ids.toArray(new String[0]));
+        SearchResponse response = getClient().ids(indexName, ids.toArray(new String[0]));
+        Map<String, Map<String, Object>> idMap = toMap(response);
 
         IntValues intValues = new IntValues();
         for (String id : ids) {
             KVInt kvInt = new KVInt();
             kvInt.setId(id);
             kvInt.setValue(0);
-            if (response.containsKey(id)) {
-                Map<String, Object> source = response.get(id);
+            if (idMap.containsKey(id)) {
+                Map<String, Object> source = idMap.get(id);
                 kvInt.setValue(((Number)source.getOrDefault(valueCName, 0)).longValue());
             }
             intValues.getValues().add(kvInt);
@@ -125,11 +127,12 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
         Thermodynamic thermodynamic = new Thermodynamic();
         List<List<Long>> thermodynamicValueMatrix = new ArrayList<>();
 
-        Map<String, Map<String, Object>> response = getClient().ids(indexName, ids.toArray(new String[0]));
+        SearchResponse response = getClient().ids(indexName, ids.toArray(new String[0]));
+        Map<String, Map<String, Object>> idMap = toMap(response);
 
         int numOfSteps = 0;
         for (String id : ids) {
-            Map<String, Object> source = response.get(id);
+            Map<String, Object> source = idMap.get(id);
             if (source == null) {
                 // add empty list to represent no data exist for this time bucket
                 thermodynamicValueMatrix.add(new ArrayList<>());
@@ -159,4 +162,13 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
 
         return thermodynamic;
     }
+
+    private Map<String, Map<String, Object>> toMap(SearchResponse response) {
+        Map<String, Map<String, Object>> result = new HashMap<>();
+        SearchHit[] hits = response.getHits().getHits();
+        for (SearchHit hit : hits) {
+            result.put(hit.getId(), hit.getSourceAsMap());
+        }
+        return result;
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java
index 70318cc..1943c94 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
 
 import java.io.IOException;
+import java.util.Map;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -29,6 +30,7 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
  * @author wusheng
  */
 public class H2MetricsDAO extends H2SQLExecutor implements IMetricsDAO<SQLExecutor, SQLExecutor> {
+
     private JDBCHikariCPClient h2Client;
     private StorageBuilder<Metrics> storageBuilder;
 
@@ -37,8 +39,9 @@ public class H2MetricsDAO extends H2SQLExecutor implements IMetricsDAO<SQLExecut
         this.storageBuilder = storageBuilder;
     }
 
-    @Override public Metrics get(Model model, Metrics metrics) throws IOException {
-        return (Metrics)getByID(h2Client, model.getName(), metrics.id(), storageBuilder);
+    @Override public Map<String, Metrics> get(Model model, Metrics[] metrics) throws IOException {
+        //        return (Metrics)getByID(h2Client, model.getName(), metrics.id(), storageBuilder);
+        return null;
     }
 
     @Override public SQLExecutor prepareBatchInsert(Model model, Metrics metrics) throws IOException {
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
index 21d3d3c..3ab7d29 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
@@ -19,26 +19,17 @@
 package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.sql.*;
+import java.util.*;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
-import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
 import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.*;
+import org.slf4j.*;
 
 /**
  * @author wusheng
@@ -52,9 +43,7 @@ public class H2SQLExecutor {
             try (ResultSet rs = h2Client.executeQuery(connection, "SELECT * FROM " + modelName + " WHERE id = ?", id)) {
                 return toStorageData(rs, modelName, storageBuilder);
             }
-        } catch (SQLException e) {
-            throw new IOException(e.getMessage(), e);
-        } catch (JDBCClientException e) {
+        } catch (SQLException | JDBCClientException e) {
             throw new IOException(e.getMessage(), e);
         }
     }
@@ -65,9 +54,7 @@ public class H2SQLExecutor {
             try (ResultSet rs = h2Client.executeQuery(connection, "SELECT * FROM " + modelName + " WHERE " + columnName + " = ?", value)) {
                 return toStorageData(rs, modelName, storageBuilder);
             }
-        } catch (SQLException e) {
-            throw new IOException(e.getMessage(), e);
-        } catch (JDBCClientException e) {
+        } catch (SQLException | JDBCClientException e) {
             throw new IOException(e.getMessage(), e);
         }
     }
@@ -92,9 +79,7 @@ public class H2SQLExecutor {
                     return rs.getInt(ServiceInstanceInventory.SEQUENCE);
                 }
             }
-        } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
-        } catch (JDBCClientException e) {
+        } catch (SQLException | JDBCClientException e) {
             logger.error(e.getMessage(), e);
         }
         return Const.NONE;
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java
index d23001d..d8ef839 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java
@@ -21,24 +21,22 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
-import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
-import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
-import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
-import org.apache.skywalking.oap.server.core.storage.StorageDAO;
+import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 /**
  * @author wusheng
  */
-public class H2StorageDAO implements StorageDAO {
+public class H2StorageDAO implements StorageDAO<SQLExecutor, SQLExecutor> {
+
     private JDBCHikariCPClient h2Client;
 
     public H2StorageDAO(JDBCHikariCPClient h2Client) {
         this.h2Client = h2Client;
     }
 
-    @Override public IMetricsDAO newMetricsDao(StorageBuilder<Metrics> storageBuilder) {
+    @Override public IMetricsDAO<SQLExecutor, SQLExecutor> newMetricsDao(StorageBuilder<Metrics> storageBuilder) {
         return new H2MetricsDAO(h2Client, storageBuilder);
     }